Merge pull request #29520 from amosbird/projection-improve4

Get rid of naming limitation of projections.
This commit is contained in:
Nikolai Kochetov 2021-10-12 14:25:29 +03:00 committed by GitHub
commit 077aba4a97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 42 additions and 34 deletions

View File

@ -439,9 +439,13 @@ void IMergeTreeDataPart::removeIfNeeded()
if (file_name.empty())
throw Exception("relative_path " + relative_path + " of part " + name + " is invalid or not set", ErrorCodes::LOGICAL_ERROR);
if (!startsWith(file_name, "tmp"))
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj"))
{
LOG_ERROR(storage.log, "~DataPart() should remove part {} but its name doesn't start with tmp. Too suspicious, keeping the part.", path);
LOG_ERROR(
storage.log,
"~DataPart() should remove part {} but its name doesn't start with \"tmp\" or end with \".tmp_proj\". Too "
"suspicious, keeping the part.",
path);
return;
}
}

View File

@ -184,6 +184,7 @@ public:
/// A directory path (relative to storage's path) where part data is actually stored
/// Examples: 'detached/tmp_fetch_<name>', 'tmp_<name>', '<name>'
/// NOTE: Cannot have trailing slash.
mutable String relative_path;
MergeTreeIndexGranularityInfo index_granularity_info;

View File

@ -89,7 +89,10 @@ static void extractMergingAndGatheringColumns(
bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
{
const String local_tmp_prefix = global_ctx->parent_part ? ctx->prefix : "tmp_merge_";
// projection parts have different prefix and suffix compared to normal parts.
// E.g. `proj_a.proj` for a normal projection merge and `proj_a.tmp_proj` for a projection materialization merge.
const String local_tmp_prefix = global_ctx->parent_part ? "" : "tmp_merge_";
const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : "";
if (global_ctx->merges_blocker->isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
@ -114,7 +117,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
}
ctx->disk = global_ctx->space_reservation->getDisk();
auto local_new_part_tmp_path = global_ctx->data->relative_data_path + local_tmp_prefix + global_ctx->future_part->name + (global_ctx->parent_part ? ".proj" : "") + "/";
auto local_new_part_relative_tmp_path_name = local_tmp_prefix + global_ctx->future_part->name + local_tmp_suffix;
auto local_new_part_tmp_path = global_ctx->data->relative_data_path + local_new_part_relative_tmp_path_name + "/";
if (ctx->disk->exists(local_new_part_tmp_path))
throw Exception("Directory " + fullPath(ctx->disk, local_new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
@ -138,7 +142,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->future_part->type,
global_ctx->future_part->part_info,
local_single_disk_volume,
local_tmp_prefix + global_ctx->future_part->name + (global_ctx->parent_part ? ".proj" : ""),
local_new_part_relative_tmp_path_name,
global_ctx->parent_part);
global_ctx->new_data_part->uuid = global_ctx->future_part->uuid;
@ -526,7 +530,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
projection_future_part->assign(std::move(projection_parts));
projection_future_part->name = projection.name;
projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/";
// TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts.
// Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection.
// projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/";
projection_future_part->part_info = {"all", 0, 0, 0};
MergeTreeData::MergingParams projection_merging_params;
@ -553,7 +559,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
global_ctx->deduplicate_by_columns,
projection_merging_params,
global_ctx->new_data_part.get(),
"", // empty string for projection
".proj",
global_ctx->data,
global_ctx->merges_blocker,
global_ctx->ttl_merges_blocker));

View File

@ -58,7 +58,7 @@ public:
Names deduplicate_by_columns_,
MergeTreeData::MergingParams merging_params_,
const IMergeTreeDataPart * parent_part_,
String prefix_,
String suffix_,
MergeTreeData * data_,
ActionBlocker * merges_blocker_,
ActionBlocker * ttl_merges_blocker_)
@ -83,7 +83,7 @@ public:
auto prepare_stage_ctx = std::make_shared<ExecuteAndFinalizeHorizontalPartRuntimeContext>();
prepare_stage_ctx->prefix = std::move(prefix_);
prepare_stage_ctx->suffix = std::move(suffix_);
prepare_stage_ctx->merging_params = std::move(merging_params_);
(*stages.begin())->setRuntimeContext(std::move(prepare_stage_ctx), global_ctx);
@ -170,7 +170,7 @@ private:
struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext //-V730
{
/// Dependencies
String prefix;
String suffix;
MergeTreeData::MergingParams merging_params{};
DiskPtr tmp_disk{nullptr};

View File

@ -428,7 +428,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
const Names & deduplicate_by_columns,
const MergeTreeData::MergingParams & merging_params,
const IMergeTreeDataPart * parent_part,
const String & prefix)
const String & suffix)
{
return std::make_shared<MergeTask>(
future_part,
@ -442,7 +442,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
deduplicate_by_columns,
merging_params,
parent_part,
prefix,
suffix,
&data,
&merges_blocker,
&ttl_merges_blocker);

View File

@ -108,7 +108,7 @@ public:
const Names & deduplicate_by_columns,
const MergeTreeData::MergingParams & merging_params,
const IMergeTreeDataPart * parent_part = nullptr,
const String & prefix = "");
const String & suffix = "");
/// Mutate a single data part with the specified commands. Will create and return a temporary part.
MutateTaskPtr mutatePartToTemporaryPart(

View File

@ -173,9 +173,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto projection_plan = std::make_unique<QueryPlan>();
if (query_info.projection->desc->is_minmax_count_projection)
{
Pipe pipe(std::make_shared<SourceFromSingleChunk>(
query_info.minmax_count_projection_block.cloneEmpty(),
Chunk(query_info.minmax_count_projection_block.getColumns(), query_info.minmax_count_projection_block.rows())));
Pipe pipe(std::make_shared<SourceFromSingleChunk>(query_info.minmax_count_projection_block));
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
projection_plan->addStep(std::move(read_from_pipe));
}

View File

@ -575,7 +575,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart(
return writeProjectionPartImpl(
part_name,
part_type,
"tmp_insert_" + part_name + ".proj" /* relative_path */,
part_name + ".tmp_proj" /* relative_path */,
true /* is_temp */,
parent_part,
data,

View File

@ -654,7 +654,7 @@ public:
{},
projection_merging_params,
ctx->new_data_part.get(),
"tmp_merge_");
".tmp_proj");
next_level_parts.push_back(executeHere(tmp_part_merge_task));
@ -832,8 +832,8 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
auto projection_block = projection_squash.add({});
if (projection_block)
{
projection_parts[projection.name].emplace_back(
MergeTreeDataWriter::writeTempProjectionPart(*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num));
projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num));
}
}
@ -1082,7 +1082,7 @@ private:
if (!ctx->disk->isDirectory(it->path()))
ctx->disk->createHardLink(it->path(), destination);
else if (!startsWith("tmp_", it->name())) // ignore projection tmp merge dir
else if (!endsWith(".tmp_proj", it->name())) // ignore projection tmp merge dir
{
// it's a projection part directory
ctx->disk->createDirectories(destination);

View File

@ -102,7 +102,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
/// It also calculates checksum of projections.
auto checksum_file = [&](const String & file_path, const String & file_name)
{
if (disk->isDirectory(file_path) && endsWith(file_name, ".proj") && !startsWith(file_name, "tmp_")) // ignore projection tmp merge dir
if (disk->isDirectory(file_path) && endsWith(file_name, ".proj"))
{
auto projection_name = file_name.substr(0, file_name.size() - sizeof(".proj") + 1);
auto pit = data_part->getProjectionParts().find(projection_name);
@ -124,7 +124,8 @@ IMergeTreeDataPart::Checksums checkDataPart(
auto file_buf = disk->readFile(proj_path);
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.ignoreAll();
projection_checksums_data.files[MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
projection_checksums_data.files[MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION]
= IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
}
else
{
@ -140,7 +141,8 @@ IMergeTreeDataPart::Checksums checkDataPart(
[&](const ISerialization::SubstreamPath & substream_path)
{
String projection_file_name = ISerialization::getFileNameForStream(projection_column, substream_path) + ".bin";
checksums_data.files[projection_file_name] = checksum_compressed_file(disk, projection_path + projection_file_name);
checksums_data.files[projection_file_name]
= checksum_compressed_file(disk, projection_path + projection_file_name);
},
{});
}

View File

@ -89,9 +89,6 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const
if (projection_definition->name.empty())
throw Exception("Projection must have name in definition.", ErrorCodes::INCORRECT_QUERY);
if (startsWith(projection_definition->name, "tmp_"))
throw Exception("Projection's name cannot start with 'tmp_'", ErrorCodes::INCORRECT_QUERY);
if (!projection_definition->query)
throw Exception("QUERY is required for projection", ErrorCodes::INCORRECT_QUERY);
@ -220,13 +217,13 @@ void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription &
Block ProjectionDescription::calculate(const Block & block, ContextPtr context) const
{
auto builder = InterpreterSelectQuery(
query_ast,
context,
Pipe(std::make_shared<SourceFromSingleChunk>(block, Chunk(block.getColumns(), block.rows()))),
SelectQueryOptions{
type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns
: QueryProcessingStage::WithMergeableState})
.buildQueryPipeline();
query_ast,
context,
Pipe(std::make_shared<SourceFromSingleChunk>(block, Chunk(block.getColumns(), block.rows()))),
SelectQueryOptions{
type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns
: QueryProcessingStage::WithMergeableState})
.buildQueryPipeline();
builder.resize(1);
builder.addTransform(std::make_shared<SquashingChunksTransform>(builder.getHeader(), block.rows(), 0));