mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Fix getting column sample for not finalized part
This commit is contained in:
parent
2ec1c88092
commit
1157028e35
@ -952,7 +952,7 @@ ColumnPtr ColumnVariant::permute(const Permutation & perm, size_t limit) const
|
||||
if (hasOnlyNulls())
|
||||
{
|
||||
if (limit)
|
||||
return cloneResized(limit);
|
||||
return cloneResized(limit ? std::min(size(), limit) : size());
|
||||
|
||||
/// If no limit, we can just return current immutable column.
|
||||
return this->getPtr();
|
||||
|
@ -2252,18 +2252,18 @@ void IMergeTreeDataPart::checkConsistencyWithProjections(bool require_part_metad
|
||||
proj_part->checkConsistency(require_part_metadata);
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk()
|
||||
void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional<Block> columns_sample)
|
||||
{
|
||||
calculateColumnsSizesOnDisk();
|
||||
calculateColumnsSizesOnDisk(columns_sample);
|
||||
calculateSecondaryIndicesSizesOnDisk();
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::calculateColumnsSizesOnDisk()
|
||||
void IMergeTreeDataPart::calculateColumnsSizesOnDisk(std::optional<Block> columns_sample)
|
||||
{
|
||||
if (getColumns().empty() || checksums.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot calculate columns sizes when columns or checksums are not initialized");
|
||||
|
||||
calculateEachColumnSizes(columns_sizes, total_columns_size);
|
||||
calculateEachColumnSizes(columns_sizes, total_columns_size, columns_sample);
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk()
|
||||
@ -2501,7 +2501,7 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co
|
||||
{
|
||||
const size_t total_mark = getMarksCount();
|
||||
/// If column doesn't have dynamic subcolumns or part has no data, just create column using it's type.
|
||||
if (is_temp || !column.type->hasDynamicSubcolumns() || !total_mark)
|
||||
if (!column.type->hasDynamicSubcolumns() || !total_mark)
|
||||
return column.type->createColumn();
|
||||
|
||||
/// Otherwise, read sample column with 0 rows from the part, so it will load dynamic structure.
|
||||
@ -2527,6 +2527,7 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co
|
||||
|
||||
Columns result;
|
||||
result.resize(1);
|
||||
LOG_DEBUG(getLogger("IMergeTreeDataPart"), "getColumnSample");
|
||||
reader->readRows(0, total_mark, false, 0, result);
|
||||
return result[0];
|
||||
}
|
||||
|
@ -426,7 +426,7 @@ public:
|
||||
bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const;
|
||||
|
||||
/// Calculate column and secondary indices sizes on disk.
|
||||
void calculateColumnsAndSecondaryIndicesSizesOnDisk();
|
||||
void calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional<Block> columns_sample = std::nullopt);
|
||||
|
||||
std::optional<String> getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const;
|
||||
|
||||
@ -631,7 +631,7 @@ protected:
|
||||
|
||||
/// Fill each_columns_size and total_size with sizes from columns files on
|
||||
/// disk using columns and checksums.
|
||||
virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0;
|
||||
virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const = 0;
|
||||
|
||||
std::optional<String> getRelativePathForDetachedPart(const String & prefix, bool broken) const;
|
||||
|
||||
@ -713,7 +713,7 @@ private:
|
||||
|
||||
void loadPartitionAndMinMaxIndex();
|
||||
|
||||
void calculateColumnsSizesOnDisk();
|
||||
void calculateColumnsSizesOnDisk(std::optional<Block> columns_sample = std::nullopt);
|
||||
|
||||
void calculateSecondaryIndicesSizesOnDisk();
|
||||
|
||||
|
@ -54,6 +54,8 @@ public:
|
||||
|
||||
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
|
||||
|
||||
virtual Block getColumnsSample() const = 0;
|
||||
|
||||
protected:
|
||||
SerializationPtr getSerialization(const String & column_name) const;
|
||||
|
||||
|
@ -80,7 +80,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter(
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size) const
|
||||
void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size, std::optional<Block> /*columns_sample*/) const
|
||||
{
|
||||
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
|
||||
if (bin_checksum != checksums.files.end())
|
||||
|
@ -70,7 +70,7 @@ private:
|
||||
void loadIndexGranularity() override;
|
||||
|
||||
/// Compact parts don't support per column size, only total size
|
||||
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
|
||||
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter(
|
||||
/// Takes into account the fact that several columns can e.g. share their .size substreams.
|
||||
/// When calculating totals these should be counted only once.
|
||||
ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
|
||||
const NameAndTypePair & column, std::unordered_set<String> * processed_substreams) const
|
||||
const NameAndTypePair & column, std::unordered_set<String> * processed_substreams, std::optional<Block> columns_sample) const
|
||||
{
|
||||
ColumnSize size;
|
||||
if (checksums.empty())
|
||||
@ -108,7 +108,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
|
||||
auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension());
|
||||
if (mrk_checksum != checksums.files.end())
|
||||
size.marks += mrk_checksum->second.file_size;
|
||||
}, column.type, getColumnSample(column));
|
||||
}, column.type, columns_sample && columns_sample->has(column.name) ? columns_sample->getByName(column.name).column : getColumnSample(column));
|
||||
|
||||
return size;
|
||||
}
|
||||
@ -374,12 +374,12 @@ std::optional<String> MergeTreeDataPartWide::getFileNameForColumn(const NameAndT
|
||||
return filename;
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const
|
||||
void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const
|
||||
{
|
||||
std::unordered_set<String> processed_substreams;
|
||||
for (const auto & column : columns)
|
||||
{
|
||||
ColumnSize size = getColumnSizeImpl(column, &processed_substreams);
|
||||
ColumnSize size = getColumnSizeImpl(column, &processed_substreams, columns_sample);
|
||||
each_columns_size[column.name] = size;
|
||||
total_size.add(size);
|
||||
|
||||
|
@ -64,9 +64,9 @@ private:
|
||||
/// Loads marks index granularity into memory
|
||||
void loadIndexGranularity() override;
|
||||
|
||||
ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set<String> * processed_substreams) const;
|
||||
ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set<String> * processed_substreams, std::optional<Block> columns_sample) const;
|
||||
|
||||
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
|
||||
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const override;
|
||||
|
||||
};
|
||||
|
||||
|
@ -123,6 +123,8 @@ public:
|
||||
written_offset_columns = written_offset_columns_;
|
||||
}
|
||||
|
||||
Block getColumnsSample() const override { return block_sample; }
|
||||
|
||||
protected:
|
||||
/// Count index_granularity for block and store in `index_granularity`
|
||||
size_t computeIndexGranularity(const Block & block) const;
|
||||
|
@ -207,7 +207,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
|
||||
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
|
||||
new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk());
|
||||
new_part->index_granularity = writer->getIndexGranularity();
|
||||
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
|
||||
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample());
|
||||
|
||||
/// In mutation, existing_rows_count is already calculated in PartMergerWriter
|
||||
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count
|
||||
|
Loading…
Reference in New Issue
Block a user