This commit is contained in:
Alexander Tokmakov 2022-08-10 15:48:56 +02:00
parent 54614f5b98
commit fff903ee81
8 changed files with 31 additions and 8 deletions

View File

@ -582,6 +582,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
tryLogCurrentException(log);
}
temporary_directory_lock = {};
/// Try again but without zero-copy
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
user, password, interserver_scheme, throttler, to_detached, tmp_prefix, nullptr, false, disk);

View File

@ -142,7 +142,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (global_ctx->data_part_storage_builder->exists())
throw Exception("Directory " + global_ctx->data_part_storage_builder->getFullPath() + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
global_ctx->temporary_directory_lock = global_ctx->data->getTemporaryPartDirectoryHolder(local_tmp_part_basename);
if (!global_ctx->parent_part)
global_ctx->temporary_directory_lock = global_ctx->data->getTemporaryPartDirectoryHolder(local_tmp_part_basename);
global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical();
global_ctx->storage_columns = global_ctx->metadata_snapshot->getColumns().getAllPhysical();

View File

@ -2864,6 +2864,13 @@ void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction
part->is_temp = false;
part->setState(DataPartState::PreActive);
assert([&]()
{
String dir_name = fs::path(part->data_part_storage->getRelativePath()).filename();
bool may_be_cleaned_up = dir_name.starts_with("tmp_") || dir_name.starts_with("tmp-fetch_");
return !may_be_cleaned_up || temporary_parts.contains(dir_name);
}());
part->renameTo(part->name, true, builder);
data_parts_indexes.insert(part);

View File

@ -15,10 +15,10 @@ bool TemporaryParts::contains(const std::string & basename) const
return parts.contains(basename);
}
void TemporaryParts::add(std::string basename)
void TemporaryParts::add(const std::string & basename)
{
std::lock_guard lock(mutex);
bool inserted = parts.emplace(std::move(basename)).second;
bool inserted = parts.emplace(basename).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary part {} already added", basename);
}

View File

@ -18,6 +18,10 @@ private:
/// NOTE: It is pretty short, so use STL is fine.
std::unordered_set<std::string> parts;
void add(const std::string & basename);
void remove(const std::string & basename);
friend class MergeTreeData;
public:
/// Returns true if passed part name is active.
/// (is the destination for one of active mutation/merge).
@ -25,9 +29,6 @@ public:
/// NOTE: that it accept basename (i.e. dirname), not the path,
/// since later requires canonical form.
bool contains(const std::string & basename) const;
void add(std::string basename);
void remove(const std::string & basename);
};
}

View File

@ -2,7 +2,7 @@
<merge_tree>
<!-- 10 seconds (default is 1 minute) -->
<zookeeper_session_expiration_check_period>10</zookeeper_session_expiration_check_period>
<!-- Default is 86400 (1 day), but we have protection from removal of tmp dirs in use -->
<temporary_directories_lifetime>0</temporary_directories_lifetime>
<!-- Default is 86400 (1 day), but we have protection from removal of tmp dirs that are currently in use -->
<temporary_directories_lifetime>1</temporary_directories_lifetime>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS test_inserts;
CREATE TABLE test_inserts (`key` Int, `part` Int) ENGINE = MergeTree PARTITION BY part ORDER BY key
SETTINGS temporary_directories_lifetime = 0, merge_tree_clear_old_temporary_directories_interval_seconds = 0;
INSERT INTO test_inserts SELECT sleep(1), number FROM numbers(10)
SETTINGS max_insert_delayed_streams_for_parallel_write = 100, max_insert_block_size = 1, min_insert_block_size_rows = 1;
SELECT count(), sum(part) FROM test_inserts;
DROP TABLE test_inserts;