diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 0f701cc4adf..93149f87f99 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -439,9 +439,13 @@ void IMergeTreeDataPart::removeIfNeeded() if (file_name.empty()) throw Exception("relative_path " + relative_path + " of part " + name + " is invalid or not set", ErrorCodes::LOGICAL_ERROR); - if (!startsWith(file_name, "tmp")) + if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj")) { - LOG_ERROR(storage.log, "~DataPart() should remove part {} but its name doesn't start with tmp. Too suspicious, keeping the part.", path); + LOG_ERROR( + storage.log, + "~DataPart() should remove part {} but its name doesn't start with \"tmp\" or end with \".tmp_proj\". Too " + "suspicious, keeping the part.", + path); return; } } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index ceb3ed64170..c889d76c2b4 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -184,6 +184,7 @@ public: /// A directory path (relative to storage's path) where part data is actually stored /// Examples: 'detached/tmp_fetch_', 'tmp_', '' + /// NOTE: Cannot have trailing slash. mutable String relative_path; MergeTreeIndexGranularityInfo index_granularity_info; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 357659b3bbb..aa3f91a4f00 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -89,7 +89,10 @@ static void extractMergingAndGatheringColumns( bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() { - const String local_tmp_prefix = global_ctx->parent_part ? ctx->prefix : "tmp_merge_"; + // projection parts have different prefix and suffix compared to normal parts. + // E.g. `proj_a.proj` for a normal projection merge and `proj_a.tmp_proj` for a projection materialization merge. + const String local_tmp_prefix = global_ctx->parent_part ? "" : "tmp_merge_"; + const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : ""; if (global_ctx->merges_blocker->isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); @@ -114,7 +117,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() } ctx->disk = global_ctx->space_reservation->getDisk(); - auto local_new_part_tmp_path = global_ctx->data->relative_data_path + local_tmp_prefix + global_ctx->future_part->name + (global_ctx->parent_part ? ".proj" : "") + "/"; + auto local_new_part_relative_tmp_path_name = local_tmp_prefix + global_ctx->future_part->name + local_tmp_suffix; + auto local_new_part_tmp_path = global_ctx->data->relative_data_path + local_new_part_relative_tmp_path_name + "/"; if (ctx->disk->exists(local_new_part_tmp_path)) throw Exception("Directory " + fullPath(ctx->disk, local_new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); @@ -138,7 +142,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() global_ctx->future_part->type, global_ctx->future_part->part_info, local_single_disk_volume, - local_tmp_prefix + global_ctx->future_part->name + (global_ctx->parent_part ? ".proj" : ""), + local_new_part_relative_tmp_path_name, global_ctx->parent_part); global_ctx->new_data_part->uuid = global_ctx->future_part->uuid; @@ -526,7 +530,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c auto projection_future_part = std::make_shared(); projection_future_part->assign(std::move(projection_parts)); projection_future_part->name = projection.name; - projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/"; + // TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts. + // Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection. + // projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/"; projection_future_part->part_info = {"all", 0, 0, 0}; MergeTreeData::MergingParams projection_merging_params; @@ -553,7 +559,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c global_ctx->deduplicate_by_columns, projection_merging_params, global_ctx->new_data_part.get(), - "", // empty string for projection + ".proj", global_ctx->data, global_ctx->merges_blocker, global_ctx->ttl_merges_blocker)); diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 05903f94c91..22dc70bd78c 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -58,7 +58,7 @@ public: Names deduplicate_by_columns_, MergeTreeData::MergingParams merging_params_, const IMergeTreeDataPart * parent_part_, - String prefix_, + String suffix_, MergeTreeData * data_, ActionBlocker * merges_blocker_, ActionBlocker * ttl_merges_blocker_) @@ -83,7 +83,7 @@ public: auto prepare_stage_ctx = std::make_shared(); - prepare_stage_ctx->prefix = std::move(prefix_); + prepare_stage_ctx->suffix = std::move(suffix_); prepare_stage_ctx->merging_params = std::move(merging_params_); (*stages.begin())->setRuntimeContext(std::move(prepare_stage_ctx), global_ctx); @@ -170,7 +170,7 @@ private: struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext //-V730 { /// Dependencies - String prefix; + String suffix; MergeTreeData::MergingParams merging_params{}; DiskPtr tmp_disk{nullptr}; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 903f4cd27fc..5d97c64b49b 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -428,7 +428,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart( const Names & deduplicate_by_columns, const MergeTreeData::MergingParams & merging_params, const IMergeTreeDataPart * parent_part, - const String & prefix) + const String & suffix) { return std::make_shared( future_part, @@ -442,7 +442,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart( deduplicate_by_columns, merging_params, parent_part, - prefix, + suffix, &data, &merges_blocker, &ttl_merges_blocker); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 9eb91d7fbf8..22650ac4eca 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -108,7 +108,7 @@ public: const Names & deduplicate_by_columns, const MergeTreeData::MergingParams & merging_params, const IMergeTreeDataPart * parent_part = nullptr, - const String & prefix = ""); + const String & suffix = ""); /// Mutate a single data part with the specified commands. Will create and return a temporary part. MutateTaskPtr mutatePartToTemporaryPart( diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 88b0878bb74..58d8974b629 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -173,9 +173,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( auto projection_plan = std::make_unique(); if (query_info.projection->desc->is_minmax_count_projection) { - Pipe pipe(std::make_shared( - query_info.minmax_count_projection_block.cloneEmpty(), - Chunk(query_info.minmax_count_projection_block.getColumns(), query_info.minmax_count_projection_block.rows()))); + Pipe pipe(std::make_shared(query_info.minmax_count_projection_block)); auto read_from_pipe = std::make_unique(std::move(pipe)); projection_plan->addStep(std::move(read_from_pipe)); } diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index d939312c0bb..752f85a1290 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -575,7 +575,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart( return writeProjectionPartImpl( part_name, part_type, - "tmp_insert_" + part_name + ".proj" /* relative_path */, + part_name + ".tmp_proj" /* relative_path */, true /* is_temp */, parent_part, data, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 115de043cd2..66d0f5f7e49 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -654,7 +654,7 @@ public: {}, projection_merging_params, ctx->new_data_part.get(), - "tmp_merge_"); + ".tmp_proj"); next_level_parts.push_back(executeHere(tmp_part_merge_task)); @@ -832,8 +832,8 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() auto projection_block = projection_squash.add({}); if (projection_block) { - projection_parts[projection.name].emplace_back( - MergeTreeDataWriter::writeTempProjectionPart(*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num)); + projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart( + *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num)); } } @@ -1082,7 +1082,7 @@ private: if (!ctx->disk->isDirectory(it->path())) ctx->disk->createHardLink(it->path(), destination); - else if (!startsWith("tmp_", it->name())) // ignore projection tmp merge dir + else if (!endsWith(".tmp_proj", it->name())) // ignore projection tmp merge dir { // it's a projection part directory ctx->disk->createDirectories(destination); diff --git a/src/Storages/MergeTree/checkDataPart.cpp b/src/Storages/MergeTree/checkDataPart.cpp index 8a234833da7..0af395fd1bd 100644 --- a/src/Storages/MergeTree/checkDataPart.cpp +++ b/src/Storages/MergeTree/checkDataPart.cpp @@ -102,7 +102,7 @@ IMergeTreeDataPart::Checksums checkDataPart( /// It also calculates checksum of projections. auto checksum_file = [&](const String & file_path, const String & file_name) { - if (disk->isDirectory(file_path) && endsWith(file_name, ".proj") && !startsWith(file_name, "tmp_")) // ignore projection tmp merge dir + if (disk->isDirectory(file_path) && endsWith(file_name, ".proj")) { auto projection_name = file_name.substr(0, file_name.size() - sizeof(".proj") + 1); auto pit = data_part->getProjectionParts().find(projection_name); @@ -124,7 +124,8 @@ IMergeTreeDataPart::Checksums checkDataPart( auto file_buf = disk->readFile(proj_path); HashingReadBuffer hashing_buf(*file_buf); hashing_buf.ignoreAll(); - projection_checksums_data.files[MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash()); + projection_checksums_data.files[MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION] + = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash()); } else { @@ -140,7 +141,8 @@ IMergeTreeDataPart::Checksums checkDataPart( [&](const ISerialization::SubstreamPath & substream_path) { String projection_file_name = ISerialization::getFileNameForStream(projection_column, substream_path) + ".bin"; - checksums_data.files[projection_file_name] = checksum_compressed_file(disk, projection_path + projection_file_name); + checksums_data.files[projection_file_name] + = checksum_compressed_file(disk, projection_path + projection_file_name); }, {}); } diff --git a/src/Storages/ProjectionsDescription.cpp b/src/Storages/ProjectionsDescription.cpp index c0b96bd9f54..42294b8152c 100644 --- a/src/Storages/ProjectionsDescription.cpp +++ b/src/Storages/ProjectionsDescription.cpp @@ -89,9 +89,6 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const if (projection_definition->name.empty()) throw Exception("Projection must have name in definition.", ErrorCodes::INCORRECT_QUERY); - if (startsWith(projection_definition->name, "tmp_")) - throw Exception("Projection's name cannot start with 'tmp_'", ErrorCodes::INCORRECT_QUERY); - if (!projection_definition->query) throw Exception("QUERY is required for projection", ErrorCodes::INCORRECT_QUERY); @@ -220,13 +217,13 @@ void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription & Block ProjectionDescription::calculate(const Block & block, ContextPtr context) const { auto builder = InterpreterSelectQuery( - query_ast, - context, - Pipe(std::make_shared(block, Chunk(block.getColumns(), block.rows()))), - SelectQueryOptions{ - type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns - : QueryProcessingStage::WithMergeableState}) - .buildQueryPipeline(); + query_ast, + context, + Pipe(std::make_shared(block, Chunk(block.getColumns(), block.rows()))), + SelectQueryOptions{ + type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns + : QueryProcessingStage::WithMergeableState}) + .buildQueryPipeline(); builder.resize(1); builder.addTransform(std::make_shared(builder.getHeader(), block.rows(), 0));