mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-30 05:30:51 +00:00
Storage getDataPath -> getDataPaths
This commit is contained in:
parent
fab2c64110
commit
0441480502
@ -305,8 +305,8 @@ public:
|
||||
/** Notify engine about updated dependencies for this storage. */
|
||||
virtual void updateDependencies() {}
|
||||
|
||||
/// Returns data path if storage supports it, empty string otherwise.
|
||||
virtual String getDataPath() const { return {}; }
|
||||
/// Returns data path if storage supports it, empty vector otherwise.
|
||||
virtual Strings getDataPaths() const { return {}; }
|
||||
|
||||
/// Returns ASTExpressionList of partition key expression for storage or nullptr if there is none.
|
||||
virtual ASTPtr getPartitionKeyAST() const { return nullptr; }
|
||||
|
@ -200,8 +200,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
|
||||
|
||||
String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
|
||||
String part_path = data.getFullPathForPart(0);
|
||||
String absolute_part_path = part_path + relative_part_path + "/"; ///@TODO_IGR ASK path for file
|
||||
auto reservation = data.reserveSpaceForPart(0); ///@TODO_IGR ASK What size should be there?
|
||||
String part_path = reservation->getPath();
|
||||
String absolute_part_path = part_path + relative_part_path + "/";
|
||||
Poco::File part_file(absolute_part_path);
|
||||
|
||||
if (part_file.exists())
|
||||
|
@ -150,8 +150,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_
|
||||
data.settings.max_bytes_to_merge_at_max_space_in_pool,
|
||||
static_cast<double>(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge);
|
||||
|
||||
///@TODO_IGR ASK what path?
|
||||
return std::min(max_size, static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_paths[0]) / DISK_USAGE_COEFFICIENT_TO_SELECT));
|
||||
return std::min(max_size, static_cast<UInt64>(DiskSpaceMonitor::getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT));
|
||||
}
|
||||
|
||||
|
||||
|
@ -162,7 +162,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
part_name = new_part_info.getPartName();
|
||||
|
||||
size_t expected_size = block.bytes();
|
||||
String part_absolute_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK expected size
|
||||
auto reservation = data.reserveSpaceForPart(expected_size); ///@TODO_IGR ASK expected size
|
||||
String part_absolute_path = reservation->getPath();
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_absolute_path, part_name, new_part_info);
|
||||
new_data_part->partition = std::move(partition);
|
||||
|
@ -233,7 +233,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
|
||||
zk_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true);
|
||||
|
||||
checkDataPart(
|
||||
storage.data.getFullPath() + part_name,
|
||||
storage.data.getFullPaths()[0] + part_name, ///@TODO_IGR ASK what should we do there? Should we check all paths in checkDataPart?
|
||||
storage.data.index_granularity,
|
||||
true,
|
||||
storage.data.primary_key_data_types,
|
||||
|
@ -32,8 +32,8 @@ void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts &
|
||||
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
current_parts.add(part->name);
|
||||
virtual_parts.add(part->name);
|
||||
current_parts.add("/", part->name);
|
||||
virtual_parts.add("/", part->name);
|
||||
}
|
||||
}
|
||||
|
||||
@ -122,7 +122,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
|
||||
{
|
||||
for (const String & virtual_part_name : entry->getVirtualPartNames())
|
||||
{
|
||||
virtual_parts.add(virtual_part_name);
|
||||
virtual_parts.add("/", virtual_part_name);
|
||||
updateMutationsPartsToDo(virtual_part_name, /* add = */ true);
|
||||
}
|
||||
|
||||
@ -192,13 +192,13 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
|
||||
{
|
||||
for (const String & virtual_part_name : entry->getVirtualPartNames())
|
||||
{
|
||||
Strings replaced_parts;
|
||||
current_parts.add(virtual_part_name, &replaced_parts);
|
||||
ActiveDataPartSet::PartPathNames replaced_parts;
|
||||
current_parts.add(String("/"), virtual_part_name, &replaced_parts);
|
||||
|
||||
/// Each part from `replaced_parts` should become Obsolete as a result of executing the entry.
|
||||
/// So it is one less part to mutate for each mutation with block number greater than part_info.getDataVersion()
|
||||
for (const String & replaced_part_name : replaced_parts)
|
||||
updateMutationsPartsToDo(replaced_part_name, /* add = */ false);
|
||||
for (const auto & replaced_part : replaced_parts)
|
||||
updateMutationsPartsToDo(replaced_part.name, /* add = */ false);
|
||||
}
|
||||
|
||||
String drop_range_part_name;
|
||||
@ -539,9 +539,9 @@ static size_t countPartsToMutate(
|
||||
/// because they are not consecutive in `parts`.
|
||||
MergeTreePartInfo covering_part_info(
|
||||
partition_id, 0, block_num, MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER);
|
||||
for (const String & covered_part_name : parts.getPartsCoveredBy(covering_part_info))
|
||||
for (const auto & covered_part : parts.getPartsCoveredBy(covering_part_info))
|
||||
{
|
||||
auto part_info = MergeTreePartInfo::fromPartName(covered_part_name, parts.getFormatVersion());
|
||||
auto part_info = MergeTreePartInfo::fromPartName(covered_part.name, parts.getFormatVersion());
|
||||
if (part_info.getDataVersion() < block_num)
|
||||
++count;
|
||||
}
|
||||
@ -1306,7 +1306,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
|
||||
void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name)
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
virtual_parts.add(part_name);
|
||||
virtual_parts.add("/", part_name);
|
||||
}
|
||||
|
||||
|
||||
@ -1572,7 +1572,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
|
||||
return false;
|
||||
}
|
||||
|
||||
if (prev_virtual_parts.getContainingPart(part->info).empty())
|
||||
if (prev_virtual_parts.getContainingPart(part->info).name.empty())
|
||||
{
|
||||
if (out_reason)
|
||||
*out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet";
|
||||
@ -1610,7 +1610,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
|
||||
{
|
||||
/// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer
|
||||
/// and it is guaranteed that it will contain all merges assigned before this object is constructed.
|
||||
String containing_part = queue.virtual_parts.getContainingPart(part->info);
|
||||
String containing_part = queue.virtual_parts.getContainingPart(part->info).name;
|
||||
if (containing_part != part->name)
|
||||
{
|
||||
if (out_reason)
|
||||
@ -1625,7 +1625,11 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
|
||||
left->info.partition_id, left_max_block + 1, right_min_block - 1,
|
||||
MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER);
|
||||
|
||||
Strings covered = queue.virtual_parts.getPartsCoveredBy(gap_part_info);
|
||||
auto tmp = queue.virtual_parts.getPartsCoveredBy(gap_part_info);
|
||||
Strings covered;
|
||||
for (auto & elem : tmp) {
|
||||
covered.push_back(elem.name);
|
||||
}
|
||||
if (!covered.empty())
|
||||
{
|
||||
if (out_reason)
|
||||
@ -1667,7 +1671,7 @@ std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersio
|
||||
|
||||
std::lock_guard lock(queue.state_mutex);
|
||||
|
||||
if (queue.virtual_parts.getContainingPart(part->info) != part->name)
|
||||
if (queue.virtual_parts.getContainingPart(part->info).name != part->name)
|
||||
return {};
|
||||
|
||||
auto in_partition = queue.mutations_by_partition.find(part->info.partition_id);
|
||||
|
@ -88,7 +88,7 @@ public:
|
||||
void startup() override;
|
||||
void shutdown() override;
|
||||
|
||||
String getDataPath() const override { return path; }
|
||||
Strings getDataPaths() const override { return {path}; }
|
||||
|
||||
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
|
||||
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
|
||||
|
@ -47,7 +47,7 @@ public:
|
||||
|
||||
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
|
||||
|
||||
String getDataPath() const override { return path; }
|
||||
Strings getDataPaths() const override { return {path}; }
|
||||
|
||||
protected:
|
||||
friend class StorageFileBlockInputStream;
|
||||
|
@ -44,7 +44,7 @@ public:
|
||||
|
||||
std::string full_path() const { return path + escapeForFileName(name) + '/';}
|
||||
|
||||
String getDataPath() const override { return full_path(); }
|
||||
Strings getDataPaths() const override { return {full_path()}; }
|
||||
|
||||
protected:
|
||||
/** Attach the table with the appropriate name, along the appropriate path (with / at the end),
|
||||
|
@ -283,10 +283,10 @@ StoragePtr StorageMaterializedView::tryGetTargetTable() const
|
||||
return global_context.tryGetTable(target_database_name, target_table_name);
|
||||
}
|
||||
|
||||
String StorageMaterializedView::getDataPath() const
|
||||
Strings StorageMaterializedView::getDataPaths() const
|
||||
{
|
||||
if (auto table = tryGetTargetTable())
|
||||
return table->getDataPath();
|
||||
return table->getDataPaths();
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -57,7 +57,7 @@ public:
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
String getDataPath() const override;
|
||||
Strings getDataPaths() const override;
|
||||
|
||||
private:
|
||||
String select_database_name;
|
||||
|
@ -62,10 +62,10 @@ StorageMergeTree::StorageMergeTree(
|
||||
const MergeTreeData::MergingParams & merging_params_,
|
||||
const MergeTreeSettings & settings_,
|
||||
bool has_force_restore_data_flag)
|
||||
: path(path_), database_name(database_name_), table_name(table_name_), full_paths{path + escapeForFileName(table_name) + '/', "/mnt/data/Data2/" + escapeForFileName(table_name) + '/'},
|
||||
: path(path_), database_name(database_name_), table_name(table_name_),
|
||||
global_context(context_), background_pool(context_.getBackgroundPool()),
|
||||
data(database_name, table_name,
|
||||
Schema(std::vector<Strings>{full_paths}), columns_, indices_, ///@TODO_IGR generate Schema from config
|
||||
Schema(std::vector<Strings>{{path, "/mnt/data/Data2/"}}), columns_, indices_, ///@TODO_IGR generate Schema from config
|
||||
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
|
||||
sample_by_ast_, merging_params_, settings_, false, attach),
|
||||
reader(data), writer(data), merger_mutator(data, global_context.getBackgroundPool()),
|
||||
@ -183,9 +183,9 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & /*ne
|
||||
|
||||
data.setPath(new_full_path);
|
||||
|
||||
path = new_path_to_db;
|
||||
path = new_path_to_db; ///@TODO_IGR ASK path? table_name?
|
||||
table_name = new_table_name;
|
||||
full_paths = {new_full_path}; ///TODO_IGR ASK rename?
|
||||
//full_paths = {new_full_path}; ///@TODO_IGR ASK rename?
|
||||
|
||||
/// NOTE: Logger names are not updated.
|
||||
}
|
||||
@ -336,6 +336,7 @@ public:
|
||||
|
||||
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
|
||||
{
|
||||
const auto full_paths = data.getFullPaths(); ///@TODO_IGR ASK What expected size of mutated part? what size should we reserve?
|
||||
MergeTreeMutationEntry entry(commands, full_paths[0], data.insert_increment.get()); ///@TODO_IGR ASK PATH TO ENTRY
|
||||
String file_name;
|
||||
{
|
||||
@ -429,18 +430,22 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
|
||||
void StorageMergeTree::loadMutations()
|
||||
{
|
||||
Poco::DirectoryIterator end;
|
||||
for (auto it = Poco::DirectoryIterator(full_paths[0]); it != end; ++it) ///@TODO_IGR ASK MUTATIONS FROM ALL DISKS?
|
||||
const auto full_paths = data.getFullPaths();
|
||||
for (const String & path : full_paths)
|
||||
{
|
||||
if (startsWith(it.name(), "mutation_"))
|
||||
for (auto it = Poco::DirectoryIterator(path); it != end; ++it)
|
||||
{
|
||||
MergeTreeMutationEntry entry(full_paths[0], it.name());
|
||||
Int64 block_number = entry.block_number;
|
||||
auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry));
|
||||
current_mutations_by_version.emplace(block_number, insertion.first->second);
|
||||
}
|
||||
else if (startsWith(it.name(), "tmp_mutation_"))
|
||||
{
|
||||
it->remove();
|
||||
if (startsWith(it.name(), "mutation_"))
|
||||
{
|
||||
MergeTreeMutationEntry entry(path, it.name());
|
||||
Int64 block_number = entry.block_number;
|
||||
auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry));
|
||||
current_mutations_by_version.emplace(block_number, insertion.first->second);
|
||||
}
|
||||
else if (startsWith(it.name(), "tmp_mutation_"))
|
||||
{
|
||||
it->remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -953,20 +958,23 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
|
||||
|
||||
String source_dir = "detached/";
|
||||
|
||||
const auto full_paths = data.getFullPaths();
|
||||
|
||||
/// Let's make a list of parts to add.
|
||||
ActiveDataPartSet::PartPathNames parts;
|
||||
if (attach_part)
|
||||
{
|
||||
for (const String & full_path : full_paths) {
|
||||
parts.push_back(ActiveDataPartSet::PartPathName{full_path, partition_id}); ///@TODO_IGR ASK
|
||||
parts.push_back(ActiveDataPartSet::PartPathName{full_path, partition_id});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
|
||||
///@TODO_IGR ASK ActiveDataPartSet without path? Is it possible here?
|
||||
ActiveDataPartSet active_parts(data.format_version);
|
||||
for (const String & full_path : full_paths) {
|
||||
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) ///@TODO_IGR
|
||||
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
|
||||
{
|
||||
const String & name = it.name();
|
||||
MergeTreePartInfo part_info;
|
||||
@ -976,7 +984,7 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
|
||||
continue;
|
||||
}
|
||||
LOG_DEBUG(log, "Found part " << name);
|
||||
active_parts.add(full_path, name);
|
||||
active_parts.add(full_path, name); ///@TODO_IGR ASK full_path? full_path + detached?
|
||||
}
|
||||
}
|
||||
LOG_DEBUG(log, active_parts.size() << " of them are active");
|
||||
|
@ -90,7 +90,7 @@ public:
|
||||
MergeTreeData & getData() { return data; }
|
||||
const MergeTreeData & getData() const { return data; }
|
||||
|
||||
String getDataPath() const override { return full_paths[0]; } ///@TODO_IGR ASK WHAT PATH
|
||||
Strings getDataPaths() const override { return data.getFullPaths(); }
|
||||
|
||||
ASTPtr getPartitionKeyAST() const override { return data.partition_by_ast; }
|
||||
ASTPtr getSortingKeyAST() const override { return data.getSortingKeyAST(); }
|
||||
@ -107,7 +107,7 @@ private:
|
||||
String path;
|
||||
String database_name;
|
||||
String table_name;
|
||||
Strings full_paths;
|
||||
// Strings full_paths;
|
||||
|
||||
Context global_context;
|
||||
BackgroundProcessingPool & background_pool;
|
||||
|
@ -215,7 +215,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name, table_name)),
|
||||
replica_name(global_context.getMacros()->expand(replica_name_, database_name, table_name)),
|
||||
data(database_name, table_name,
|
||||
full_path, columns_, indices_,
|
||||
Schema(std::vector<Strings>{{full_path}}), columns_, indices_,
|
||||
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
|
||||
sample_by_ast_, merging_params_, settings_, true, attach,
|
||||
[this] (const std::string & name) { enqueuePartForCheck(name); }),
|
||||
@ -1046,7 +1046,10 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
||||
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
|
||||
|
||||
/// Can throw an exception.
|
||||
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_merge);
|
||||
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::tryToReserve(full_path, estimated_space_for_merge);
|
||||
if (!reserved_space) {
|
||||
throw Exception("TMP MSG", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR FIX
|
||||
}
|
||||
|
||||
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
|
||||
|
||||
@ -1176,7 +1179,10 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
|
||||
MutationCommands commands = queue.getMutationCommands(source_part, new_part_info.mutation);
|
||||
|
||||
/// Can throw an exception.
|
||||
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_result);
|
||||
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::tryToReserve(full_path, estimated_space_for_result);
|
||||
if (!reserved_space) {
|
||||
throw Exception("TMP MSG", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR FIX
|
||||
}
|
||||
|
||||
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
|
||||
|
||||
@ -1202,7 +1208,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
|
||||
|
||||
try
|
||||
{
|
||||
new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context);
|
||||
new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context, reserved_space.get());
|
||||
data.renameTempPartAndReplace(new_part, nullptr, &transaction);
|
||||
|
||||
try
|
||||
@ -1694,7 +1700,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
if (part_desc->src_table_part)
|
||||
{
|
||||
/// It is clonable part
|
||||
adding_parts_active_set.add(part_desc->new_part_name);
|
||||
adding_parts_active_set.add(full_path, part_desc->new_part_name);
|
||||
part_name_to_desc.emplace(part_desc->new_part_name, part_desc);
|
||||
continue;
|
||||
}
|
||||
@ -1727,14 +1733,14 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
part_desc->found_new_part_info = MergeTreePartInfo::fromPartName(found_part_name, data.format_version);
|
||||
part_desc->replica = replica;
|
||||
|
||||
adding_parts_active_set.add(part_desc->found_new_part_name);
|
||||
adding_parts_active_set.add(full_path, part_desc->found_new_part_name);
|
||||
part_name_to_desc.emplace(part_desc->found_new_part_name, part_desc);
|
||||
}
|
||||
|
||||
/// Check that we could cover whole range
|
||||
for (PartDescriptionPtr & part_desc : parts_to_add)
|
||||
{
|
||||
if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty())
|
||||
if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).name.empty())
|
||||
{
|
||||
throw Exception("Not found part " + part_desc->new_part_name +
|
||||
" (or part covering it) neither source table neither remote replicas" , ErrorCodes::NO_REPLICA_HAS_PART);
|
||||
@ -1744,10 +1750,11 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
/// Filter covered parts
|
||||
PartDescriptions final_parts;
|
||||
{
|
||||
Strings final_part_names = adding_parts_active_set.getParts();
|
||||
auto final_part_names = adding_parts_active_set.getParts();
|
||||
|
||||
for (const String & final_part_name : final_part_names)
|
||||
for (const auto & final_part : final_part_names)
|
||||
{
|
||||
const auto & final_part_name = final_part.name;
|
||||
auto part_desc = part_name_to_desc[final_part_name];
|
||||
if (!part_desc)
|
||||
throw Exception("There is no final part " + final_part_name + ". This is a bug", ErrorCodes::LOGICAL_ERROR);
|
||||
@ -1925,12 +1932,18 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
||||
}
|
||||
|
||||
/// Add to the queue jobs to receive all the active parts that the reference/master replica has.
|
||||
Strings parts = zookeeper->getChildren(source_path + "/parts");
|
||||
Strings parts_tmp = zookeeper->getChildren(source_path + "/parts");
|
||||
ActiveDataPartSet::PartPathNames parts;
|
||||
for (const auto & elem : parts_tmp) {
|
||||
parts.push_back(ActiveDataPartSet::PartPathName{"/", elem});
|
||||
}
|
||||
|
||||
ActiveDataPartSet active_parts_set(data.format_version, parts);
|
||||
|
||||
Strings active_parts = active_parts_set.getParts();
|
||||
for (const String & name : active_parts)
|
||||
auto active_parts = active_parts_set.getParts();
|
||||
for (const auto & path_name : active_parts)
|
||||
{
|
||||
const auto & name = path_name.name;
|
||||
LogEntry log_entry;
|
||||
log_entry.type = LogEntry::GET_PART;
|
||||
log_entry.source_replica = "";
|
||||
@ -3553,16 +3566,19 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
|
||||
if (part_info.partition_id != partition_id)
|
||||
continue;
|
||||
LOG_DEBUG(log, "Found part " << name);
|
||||
active_parts.add(name);
|
||||
active_parts.add(full_path, name);
|
||||
part_names.insert(name);
|
||||
}
|
||||
LOG_DEBUG(log, active_parts.size() << " of them are active");
|
||||
parts = active_parts.getParts();
|
||||
auto tmp_parts = active_parts.getParts();
|
||||
for (auto & elem : tmp_parts) {
|
||||
parts.push_back(elem.name);
|
||||
}
|
||||
|
||||
/// Inactive parts rename so they can not be attached in case of repeated ATTACH.
|
||||
for (const auto & name : part_names)
|
||||
{
|
||||
String containing_part = active_parts.getContainingPart(name);
|
||||
String containing_part = active_parts.getContainingPart(name).name;
|
||||
if (!containing_part.empty() && containing_part != name)
|
||||
Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name);
|
||||
}
|
||||
@ -3574,7 +3590,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
|
||||
for (const String & part : parts)
|
||||
{
|
||||
LOG_DEBUG(log, "Checking part " << part);
|
||||
loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir + part));
|
||||
loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir, source_dir + part));
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, false); /// TODO Allow to use quorum here.
|
||||
@ -4138,7 +4154,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
|
||||
* Unreliable (there is a race condition) - such a partition may appear a little later.
|
||||
*/
|
||||
Poco::DirectoryIterator dir_end;
|
||||
for (Poco::DirectoryIterator dir_it{data.getFullPath() + "detached/"}; dir_it != dir_end; ++dir_it)
|
||||
for (Poco::DirectoryIterator dir_it{data.getFullPaths()[0] + "detached/"}; dir_it != dir_end; ++dir_it)
|
||||
{
|
||||
MergeTreePartInfo part_info;
|
||||
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, data.format_version)
|
||||
@ -4221,13 +4237,20 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
|
||||
if (try_no >= query_context.getSettings().max_fetch_partition_retries_count)
|
||||
throw Exception("Too many retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS);
|
||||
|
||||
Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts");
|
||||
Strings parts_tmp = getZooKeeper()->getChildren(best_replica_path + "/parts");
|
||||
ActiveDataPartSet::PartPathNames parts;
|
||||
for (const auto & elem : parts_tmp) {
|
||||
parts.push_back(ActiveDataPartSet::PartPathName{"/", elem});
|
||||
}
|
||||
ActiveDataPartSet active_parts_set(data.format_version, parts);
|
||||
Strings parts_to_fetch;
|
||||
|
||||
if (missing_parts.empty())
|
||||
{
|
||||
parts_to_fetch = active_parts_set.getParts();
|
||||
auto tmp = active_parts_set.getParts();
|
||||
for (auto elem : tmp) {
|
||||
parts_to_fetch.push_back(elem.name);
|
||||
}
|
||||
|
||||
/// Leaving only the parts of the desired partition.
|
||||
Strings parts_to_fetch_partition;
|
||||
@ -4246,7 +4269,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
|
||||
{
|
||||
for (const String & missing_part : missing_parts)
|
||||
{
|
||||
String containing_part = active_parts_set.getContainingPart(missing_part);
|
||||
String containing_part = active_parts_set.getContainingPart(missing_part).name;
|
||||
if (!containing_part.empty())
|
||||
parts_to_fetch.push_back(containing_part);
|
||||
else
|
||||
|
@ -194,7 +194,7 @@ public:
|
||||
part_check_thread.enqueuePart(part_name, delay_to_check_seconds);
|
||||
}
|
||||
|
||||
String getDataPath() const override { return full_path; }
|
||||
Strings getDataPaths() const override { return {full_path}; }
|
||||
|
||||
ASTPtr getPartitionKeyAST() const override { return data.partition_by_ast; }
|
||||
ASTPtr getSortingKeyAST() const override { return data.getSortingKeyAST(); }
|
||||
|
@ -25,7 +25,7 @@ public:
|
||||
|
||||
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
|
||||
|
||||
String getDataPath() const override { return path; }
|
||||
Strings getDataPaths() const override { return {path}; }
|
||||
|
||||
protected:
|
||||
StorageSetOrJoinBase(
|
||||
|
@ -51,7 +51,7 @@ public:
|
||||
|
||||
std::string full_path() const { return path + escapeForFileName(name) + '/';}
|
||||
|
||||
String getDataPath() const override { return full_path(); }
|
||||
Strings getDataPaths() const override { return {full_path()}; }
|
||||
|
||||
void truncate(const ASTPtr &, const Context &) override;
|
||||
|
||||
|
@ -50,7 +50,7 @@ public:
|
||||
|
||||
std::string full_path() const { return path + escapeForFileName(name) + '/';}
|
||||
|
||||
String getDataPath() const override { return full_path(); }
|
||||
Strings getDataPaths() const override { return {full_path()}; }
|
||||
|
||||
void truncate(const ASTPtr &, const Context &) override;
|
||||
|
||||
|
@ -195,8 +195,11 @@ protected:
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(0u); // is_temporary
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(tables_it->table()->getDataPath());
|
||||
if (columns_mask[src_index++]) {
|
||||
for (const String & path : tables_it->table()->getDataPaths() ) {
|
||||
res_columns[res_index++]->insert(path); ///@TODO_IGR ASK Is it fine?
|
||||
}
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database->getTableMetadataPath(table_name));
|
||||
|
Loading…
Reference in New Issue
Block a user