polymorphic parts (development) columns sizes

This commit is contained in:
CurtizJ 2019-12-03 03:23:11 +03:00
parent 511ae82e27
commit be0e13d28f
7 changed files with 108 additions and 93 deletions

View File

@ -163,24 +163,6 @@ IMergeTreeDataPart::IMergeTreeDataPart(
}
ColumnSize IMergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & type) const
{
return getColumnSizeImpl(column_name, type, nullptr);
}
ColumnSize IMergeTreeDataPart::getTotalColumnsSize() const
{
ColumnSize totals;
std::unordered_set<String> processed_substreams;
for (const NameAndTypePair & column : columns)
{
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
totals.add(size);
}
return totals;
}
String IMergeTreeDataPart::getNewName(const MergeTreePartInfo & new_part_info) const
{
if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
@ -198,9 +180,11 @@ String IMergeTreeDataPart::getNewName(const MergeTreePartInfo & new_part_info) c
return new_part_info.getPartName();
}
size_t IMergeTreeDataPart::getColumnPosition(const String & column_name) const
std::optional<size_t> IMergeTreeDataPart::getColumnPosition(const String & column_name) const
{
return sample_block.getPositionByName(column_name);
if (!sample_block.has(column_name))
return {};
return sample_block.getPositionByName(column_name);
}
DayNum IMergeTreeDataPart::getMinDate() const
@ -513,7 +497,7 @@ void IMergeTreeDataPart::loadRowsCount()
if (!column_col->isFixedAndContiguous() || column_col->lowCardinality())
continue;
size_t column_size = getColumnSizeImpl(column.name, *column.type, nullptr).data_uncompressed;
size_t column_size = getColumnSize(column.name, *column.type).data_uncompressed;
if (!column_size)
continue;

View File

@ -52,6 +52,8 @@ public:
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
using MergeTreeWriterPtr = std::unique_ptr<IMergeTreeDataPartWriter>;
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
virtual MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns_,
const MarkRanges & mark_ranges,
@ -70,23 +72,22 @@ public:
virtual bool isStoredOnDisk() const = 0;
void remove() const;
virtual bool supportsVerticalMerge() const { return false; }
/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (either by locking columns_lock, or by locking table structure).
ColumnSize getColumnSize(const String & name, const IDataType & type);
virtual ColumnSize getColumnSize(const String & /* name */, const IDataType & /* type */) const { return {}; }
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
virtual ColumnSize getTotalColumnsSize() const { return {}; }
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
virtual String getColumnNameWithMinumumCompressedSize() const { return columns.front().name; }
virtual ~IMergeTreeDataPart();
// virtual Checksums check(
// bool require_checksums,
// const DataTypes & primary_key_data_types, /// Check the primary key. If it is not necessary, pass an empty array.
@ -97,20 +98,14 @@ public:
// }
using ColumnToSize = std::map<std::string, UInt64>;
virtual void accumulateColumnSizes(ColumnToSize & column_to_size) const;
using Type = MergeTreeDataPartType;
virtual Type getType() const = 0;
// virtual void renameTo() = 0;
static String typeToString(Type type);
String getTypeName() { return typeToString(getType()); }
virtual ~IMergeTreeDataPart();
IMergeTreeDataPart(
const MergeTreeData & storage_,
@ -127,9 +122,11 @@ public:
void assertOnDisk() const;
ColumnSize getColumnSize(const String & column_name, const IDataType & type) const;
void remove() const;
ColumnSize getTotalColumnsSize() const;
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
String getMarksFileExtension() const { return index_granularity_info.marks_file_extension; }
@ -138,7 +135,7 @@ public:
String getNewName(const MergeTreePartInfo & new_part_info) const;
// Block sample_block;
size_t getColumnPosition(const String & column_name) const;
std::optional<size_t> getColumnPosition(const String & column_name) const;
bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); }
@ -348,8 +345,6 @@ private:
void loadPartitionAndMinMaxIndex();
String getRelativePathForDetachedPart(const String & prefix) const;
virtual ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const = 0;
};
using MergeTreeDataPartState = IMergeTreeDataPart::State;

View File

@ -98,38 +98,28 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
default_codec, writer_settings, computed_index_granularity);
}
/// Takes into account the fact that several columns can e.g. share their .size substreams.
/// When calculating totals these should be counted only once.
ColumnSize MergeTreeDataPartCompact::getColumnSizeImpl(
const String & column_name, const IDataType & type, std::unordered_set<String> * processed_substreams) const
ColumnSize MergeTreeDataPartCompact::getColumnSize(const String & column_name, const IDataType & /* type */) const
{
UNUSED(column_name);
UNUSED(type);
UNUSED(processed_substreams);
// ColumnSize size;
// if (checksums.empty())
// return size;
auto column_size = columns_sizes.find(column_name);
if (column_size == columns_sizes.end())
return {};
return column_size->second;
}
// type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
// {
// String file_name = IDataType::getFileNameForStream(column_name, substream_path);
// if (processed_substreams && !processed_substreams->insert(file_name).second)
// return;
// auto bin_checksum = checksums.files.find(file_name + ".bin");
// if (bin_checksum != checksums.files.end())
// {
// size.data_compressed += bin_checksum->second.file_size;
// size.data_uncompressed += bin_checksum->second.uncompressed_size;
// }
// auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
// if (mrk_checksum != checksums.files.end())
// size.marks += mrk_checksum->second.file_size;
// }, {});
return ColumnSize{};
ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const
{
ColumnSize totals;
size_t marks_size = 0;
for (const auto & column : columns)
{
auto column_size = getColumnSize(column.name, *column.type);
totals.add(column_size);
if (!marks_size && column_size.marks)
marks_size = column_size.marks;
}
/// Marks are shared between all columns
totals.marks = marks_size;
return totals;
}
/** Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
@ -137,29 +127,26 @@ ColumnSize MergeTreeDataPartCompact::getColumnSizeImpl(
*/
String MergeTreeDataPartCompact::getColumnNameWithMinumumCompressedSize() const
{
/// FIXME: save column sizes
const auto & storage_columns = storage.getColumns().getAllPhysical();
const std::string * minimum_size_column = nullptr;
UInt64 minimum_size = std::numeric_limits<UInt64>::max();
for (const auto & column : storage_columns)
{
if (!getColumnPosition(column.name))
continue;
// const auto & storage_columns = storage.getColumns().getAllPhysical();
// const std::string * minimum_size_column = nullptr;
// UInt64 minimum_size = std::numeric_limits<UInt64>::max();
auto size = getColumnSize(column.name, *column.type).data_compressed;
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column.name;
}
}
// for (const auto & column : storage_columns)
// {
// if (!hasColumnFiles(column.name, *column.type))
// continue;
// const auto size = getColumnSizeImpl(column.name, *column.type, nullptr).data_compressed;
// if (size < minimum_size)
// {
// minimum_size = size;
// minimum_size_column = &column.name;
// }
// }
// if (!minimum_size_column)
// throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR);
return columns.front().name;
if (!minimum_size_column)
throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR);
return *minimum_size_column;
}
void MergeTreeDataPartCompact::loadIndexGranularity()
@ -199,6 +186,28 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
index_granularity.setInitialized();
}
void MergeTreeDataPartCompact::loadColumnSizes()
{
size_t columns_num = columns.size();
if (columns_num == 0)
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
auto column_sizes_path = getFullPath() + "columns_sizes.txt";
auto columns_sizes_file = Poco::File(column_sizes_path);
if (!columns_sizes_file.exists())
{
LOG_WARNING(storage.log, "No file column_sizes.txt in part " + name);
return;
}
ReadBufferFromFile buffer(column_sizes_path, columns_sizes_file.getSize());
auto it = columns.begin();
for (size_t i = 0; i < columns_num; ++i, ++it)
readPODBinary(columns_sizes[it->name], buffer);
assertEOF(buffer);
}
void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata)
{
UNUSED(require_part_metadata);

View File

@ -69,7 +69,11 @@ public:
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const override;
virtual Type getType() const override { return Type::COMPACT; }
Type getType() const override { return Type::COMPACT; }
ColumnSize getColumnSize(const String & name, const IDataType & type0) const override;
ColumnSize getTotalColumnsSize() const override;
void checkConsistency(bool /* require_part_metadata */) const override {}
@ -79,9 +83,12 @@ private:
/// Loads marks index granularity into memory
void loadIndexGranularity() override;
ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const override;
void loadColumnSizes();
void checkConsistency(bool require_part_metadata);
ColumnSizeByName columns_sizes;
};

View File

@ -158,6 +158,23 @@ String MergeTreeDataPartWide::getColumnNameWithMinumumCompressedSize() const
return *minimum_size_column;
}
ColumnSize MergeTreeDataPartWide::getTotalColumnsSize() const
{
ColumnSize totals;
std::unordered_set<String> processed_substreams;
for (const NameAndTypePair & column : columns)
{
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
totals.add(size);
}
return totals;
}
ColumnSize MergeTreeDataPartWide::getColumnSize(const String & column_name, const IDataType & type) const
{
return getColumnSizeImpl(column_name, type, nullptr);
}
void MergeTreeDataPartWide::loadIndexGranularity()
{
String full_path = getFullPath();

View File

@ -75,6 +75,10 @@ public:
Type getType() const override { return Type::WIDE; }
ColumnSize getTotalColumnsSize() const override;
ColumnSize getColumnSize(const String & column_name, const IDataType & type) const override;
~MergeTreeDataPartWide() override;
protected:
@ -84,8 +88,7 @@ private:
/// Loads marks index granularity into memory
void loadIndexGranularity() override;
ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const override;
ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const;
};
// using MergeTreeDataPartState =IMergeTreeDataPart::State;

View File

@ -79,7 +79,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
try
{
size_t column_size_before_reading = column->size();
size_t column_position = data_part->getColumnPosition(it.name);
auto column_position = *data_part->getColumnPosition(it.name);
readData(it.name, *it.type, *column, from_mark, column_position, rows_to_read);