This commit is contained in:
Alexey Milovidov 2020-08-28 03:53:22 +03:00
parent ed1d120de0
commit 5763737d97
3 changed files with 24 additions and 4 deletions

View File

@ -3262,7 +3262,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
}
if (!does_storage_policy_allow_same_disk)
throw Exception(
"Could not clone and load part " + quoteString(src_part->getFullPath()) + " because disk does not belong to storage policy", ErrorCodes::BAD_ARGUMENTS);
"Could not clone and load part " + quoteString(src_part->getFullPath()) + " because disk does not belong to storage policy",
ErrorCodes::BAD_ARGUMENTS);
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;

View File

@ -113,6 +113,7 @@ namespace ErrorCodes
extern const int ALL_REPLICAS_LOST;
extern const int REPLICA_STATUS_CHANGED;
extern const int CANNOT_ASSIGN_ALTER;
extern const int DIRECTORY_ALREADY_EXISTS;
}
namespace ActionLocks
@ -3256,6 +3257,15 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
part->renameTo("detached/" + part_name, true);
}
}
catch (const Exception & e)
{
/// The same part is being written right now (but probably it's not committed yet).
/// We will check the need for fetch later.
if (e.code() == ErrorCodes::DIRECTORY_ALREADY_EXISTS)
return false;
throw;
}
catch (...)
{
if (!to_detached)
@ -4689,9 +4699,11 @@ void StorageReplicatedMergeTree::fetchPartition(
missing_parts.clear();
for (const String & part : parts_to_fetch)
{
bool fetched = false;
try
{
fetchPart(part, metadata_snapshot, best_replica_path, true, 0);
fetched = fetchPart(part, metadata_snapshot, best_replica_path, true, 0);
}
catch (const DB::Exception & e)
{
@ -4700,8 +4712,10 @@ void StorageReplicatedMergeTree::fetchPartition(
throw;
LOG_INFO(log, e.displayText());
missing_parts.push_back(part);
}
if (!fetched)
missing_parts.push_back(part);
}
++try_no;

View File

@ -478,7 +478,12 @@ private:
* If quorum != 0, then the node for tracking the quorum is updated.
* Returns false if part is already fetching right now.
*/
bool fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot, const String & replica_path, bool to_detached, size_t quorum);
bool fetchPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,
bool to_detached,
size_t quorum);
/// Required only to avoid races between executeLogEntry and fetchPartition
std::unordered_set<String> currently_fetching_parts;