remove almost useless columns sizes from compact parts

This commit is contained in:
CurtizJ 2020-01-15 21:24:10 +03:00
parent b3bd306a5d
commit 3ff8f424ed
9 changed files with 50 additions and 148 deletions

View File

@ -357,6 +357,32 @@ size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
return checksum->second.file_size; return checksum->second.file_size;
} }
String IMergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
{
const auto & storage_columns = storage.getColumns().getAllPhysical();
const std::string * minimum_size_column = nullptr;
UInt64 minimum_size = std::numeric_limits<UInt64>::max();
for (const auto & column : storage_columns)
{
if (!hasColumnFiles(column.name, *column.type))
continue;
const auto size = getColumnSize(column.name, *column.type).data_compressed;
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column.name;
}
}
if (!minimum_size_column)
throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR);
return *minimum_size_column;
}
String IMergeTreeDataPart::getFullPath() const String IMergeTreeDataPart::getFullPath() const
{ {
assertOnDisk(); assertOnDisk();
@ -380,7 +406,6 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
loadChecksums(require_columns_checksums); loadChecksums(require_columns_checksums);
loadIndexGranularity(); loadIndexGranularity();
loadIndex(); /// Must be called after loadIndexGranularity as it uses the value of `index_granularity` loadIndex(); /// Must be called after loadIndexGranularity as it uses the value of `index_granularity`
loadColumnSizes();
loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `index_granularity`. loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `index_granularity`.
loadPartitionAndMinMaxIndex(); loadPartitionAndMinMaxIndex();
loadTTLInfos(); loadTTLInfos();
@ -490,13 +515,13 @@ void IMergeTreeDataPart::loadChecksums(bool require)
void IMergeTreeDataPart::loadRowsCount() void IMergeTreeDataPart::loadRowsCount()
{ {
String path = getFullPath() + "count.txt";
if (index_granularity.empty()) if (index_granularity.empty())
{ {
rows_count = 0; rows_count = 0;
} }
else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{ {
String path = getFullPath() + "count.txt";
if (!Poco::File(path).exists()) if (!Poco::File(path).exists())
throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
@ -506,6 +531,14 @@ void IMergeTreeDataPart::loadRowsCount()
} }
else else
{ {
if (Poco::File(path).exists())
{
ReadBufferFromFile file = openForReading(path);
readIntText(rows_count, file);
assertEOF(file);
return;
}
for (const NameAndTypePair & column : columns) for (const NameAndTypePair & column : columns)
{ {
ColumnPtr column_col = column.type->createColumn(); ColumnPtr column_col = column.type->createColumn();
@ -575,7 +608,8 @@ void IMergeTreeDataPart::loadColumns(bool require)
Poco::File poco_file_path{path}; Poco::File poco_file_path{path};
if (!poco_file_path.exists()) if (!poco_file_path.exists())
{ {
if (require || isCompactPart(shared_from_this())) /// We can get list of columns only from columns.txt in compact parts.
if (require || part_type == Type::COMPACT)
throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
/// If there is no file with a list of columns, write it down. /// If there is no file with a list of columns, write it down.
@ -604,26 +638,6 @@ void IMergeTreeDataPart::loadColumns(bool require)
column_name_to_position.emplace(column.name, pos++); column_name_to_position.emplace(column.name, pos++);
} }
void IMergeTreeDataPart::loadColumnSizes()
{
size_t columns_num = columns.size();
if (columns_num == 0)
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
auto column_sizes_path = getFullPath() + "columns_sizes.txt";
auto columns_sizes_file = Poco::File(column_sizes_path);
if (!columns_sizes_file.exists())
return;
ReadBufferFromFile buffer(column_sizes_path, columns_sizes_file.getSize());
auto it = columns.begin();
for (size_t i = 0; i < columns_num; ++i, ++it)
readPODBinary(columns_sizes[it->name], buffer);
assertEOF(buffer);
}
UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const String & from) UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const String & from)
{ {
Poco::File cur(from); Poco::File cur(from);

View File

@ -83,7 +83,7 @@ public:
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column. /// If no checksums are present returns the name of the first physically existing column.
virtual String getColumnNameWithMinumumCompressedSize() const { return columns.front().name; } String getColumnNameWithMinumumCompressedSize() const;
virtual String getFileNameForColumn(const NameAndTypePair & column) const = 0; virtual String getFileNameForColumn(const NameAndTypePair & column) const = 0;
@ -295,8 +295,6 @@ public:
*/ */
mutable std::shared_mutex columns_lock; mutable std::shared_mutex columns_lock;
ColumnSizeByName columns_sizes;
/// For data in RAM ('index') /// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const; UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const; UInt64 getIndexSizeInAllocatedBytes() const;
@ -320,6 +318,7 @@ protected:
Type part_type; Type part_type;
void removeIfNeeded(); void removeIfNeeded();
virtual void checkConsistency(bool require_part_metadata) const; virtual void checkConsistency(bool require_part_metadata) const;
void checkConsistencyBase(bool require_part_metadata) const;
private: private:
/// In compact parts order of columns is necessary /// In compact parts order of columns is necessary
@ -346,8 +345,6 @@ private:
void loadPartitionAndMinMaxIndex(); void loadPartitionAndMinMaxIndex();
void loadColumnSizes();
String getRelativePathForDetachedPart(const String & prefix) const; String getRelativePathForDetachedPart(const String & prefix) const;
}; };

View File

@ -92,8 +92,6 @@ public:
return Columns(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end())); return Columns(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
} }
const MergeTreeData::ColumnSizeByName & getColumnsSizes() const { return columns_sizes; }
void setWrittenOffsetColumns(WrittenOffsetColumns * written_offset_columns_) void setWrittenOffsetColumns(WrittenOffsetColumns * written_offset_columns_)
{ {
written_offset_columns = written_offset_columns_; written_offset_columns = written_offset_columns_;
@ -158,8 +156,6 @@ protected:
bool primary_index_initialized = false; bool primary_index_initialized = false;
bool skip_indices_initialized = false; bool skip_indices_initialized = false;
MergeTreeData::ColumnSizeByName columns_sizes;
/// To correctly write Nested elements column-by-column. /// To correctly write Nested elements column-by-column.
WrittenOffsetColumns * written_offset_columns = nullptr; WrittenOffsetColumns * written_offset_columns = nullptr;
}; };

View File

@ -30,10 +30,6 @@
namespace DB namespace DB
{ {
// namespace
// {
// }
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int FILE_DOESNT_EXIST; extern const int FILE_DOESNT_EXIST;
@ -47,11 +43,6 @@ namespace ErrorCodes
} }
// static ReadBufferFromFile openForReading(const String & path)
// {
// return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
// }
MergeTreeDataPartCompact::MergeTreeDataPartCompact( MergeTreeDataPartCompact::MergeTreeDataPartCompact(
MergeTreeData & storage_, MergeTreeData & storage_,
const String & name_, const String & name_,
@ -107,55 +98,21 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
default_codec, writer_settings, computed_index_granularity); default_codec, writer_settings, computed_index_granularity);
} }
ColumnSize MergeTreeDataPartCompact::getColumnSize(const String & column_name, const IDataType & /* type */) const
{
auto column_size = columns_sizes.find(column_name);
if (column_size == columns_sizes.end())
return {};
return column_size->second;
}
ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const
{ {
ColumnSize totals; ColumnSize total_size;
size_t marks_size = 0; auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
for (const auto & column : columns) if (bin_checksum != checksums.files.end())
{ {
auto column_size = getColumnSize(column.name, *column.type); total_size.data_compressed += bin_checksum->second.file_size;
totals.add(column_size); total_size.data_compressed += bin_checksum->second.uncompressed_size;
if (!marks_size && column_size.marks)
marks_size = column_size.marks;
}
/// Marks are shared between all columns
totals.marks = marks_size;
return totals;
}
/** Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
* If no checksums are present returns the name of the first physically existing column.
*/
String MergeTreeDataPartCompact::getColumnNameWithMinumumCompressedSize() const
{
const auto & storage_columns = storage.getColumns().getAllPhysical();
const std::string * minimum_size_column = nullptr;
UInt64 minimum_size = std::numeric_limits<UInt64>::max();
for (const auto & column : storage_columns)
{
if (!getColumnPosition(column.name))
continue;
auto size = getColumnSize(column.name, *column.type).data_compressed;
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column.name;
}
} }
if (!minimum_size_column) auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension);
throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR); if (mrk_checksum != checksums.files.end())
total_size.marks += mrk_checksum->second.file_size;
return *minimum_size_column; return total_size;
} }
void MergeTreeDataPartCompact::loadIndexGranularity() void MergeTreeDataPartCompact::loadIndexGranularity()

View File

@ -69,12 +69,6 @@ public:
bool isStoredOnDisk() const override { return true; } bool isStoredOnDisk() const override { return true; }
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const override;
ColumnSize getColumnSize(const String & name, const IDataType & type0) const override;
ColumnSize getTotalColumnsSize() const override; ColumnSize getTotalColumnsSize() const override;
void checkConsistency(bool /* require_part_metadata */) const override {} void checkConsistency(bool /* require_part_metadata */) const override {}

View File

@ -126,34 +126,6 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
return size; return size;
} }
/** Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
* If no checksums are present returns the name of the first physically existing column.
*/
String MergeTreeDataPartWide::getColumnNameWithMinumumCompressedSize() const
{
const auto & storage_columns = storage.getColumns().getAllPhysical();
const std::string * minimum_size_column = nullptr;
UInt64 minimum_size = std::numeric_limits<UInt64>::max();
for (const auto & column : storage_columns)
{
if (!hasColumnFiles(column.name, *column.type))
continue;
const auto size = getColumnSizeImpl(column.name, *column.type, nullptr).data_compressed;
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column.name;
}
}
if (!minimum_size_column)
throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR);
return *minimum_size_column;
}
ColumnSize MergeTreeDataPartWide::getTotalColumnsSize() const ColumnSize MergeTreeDataPartWide::getTotalColumnsSize() const
{ {
ColumnSize totals; ColumnSize totals;

View File

@ -70,10 +70,6 @@ public:
String getFileNameForColumn(const NameAndTypePair & column) const override; String getFileNameForColumn(const NameAndTypePair & column) const override;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const override;
ColumnSize getTotalColumnsSize() const override; ColumnSize getTotalColumnsSize() const override;
ColumnSize getColumnSize(const String & column_name, const IDataType & type) const override; ColumnSize getColumnSize(const String & column_name, const IDataType & type) const override;

View File

@ -105,15 +105,10 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
if (stream->compressed.offset() >= settings.min_compress_block_size) if (stream->compressed.offset() >= settings.min_compress_block_size)
stream->compressed.next(); stream->compressed.next();
size_t old_uncompressed_size = stream->compressed.count();
writeIntBinary(stream->plain_hashing.count(), stream->marks); writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks); writeIntBinary(stream->compressed.offset(), stream->marks);
writeColumnSingleGranule(block.getByName(column.name), current_row, rows_to_write); writeColumnSingleGranule(block.getByName(column.name), current_row, rows_to_write);
/// We can't calculate compressed size by single column in compact format.
size_t uncompressed_size = stream->compressed.count();
columns_sizes[column.name].add(ColumnSize{0, 0, uncompressed_size - old_uncompressed_size});
} }
++from_mark; ++from_mark;
@ -163,10 +158,6 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
writeIntBinary(0ULL, stream->marks); writeIntBinary(0ULL, stream->marks);
} }
size_t marks_size = stream->marks.count();
for (auto it = columns_sizes.begin(); it != columns_sizes.end(); ++it)
it->second.marks = marks_size;
stream->finalize(); stream->finalize();
if (sync) if (sync)
stream->sync(); stream->sync();

View File

@ -113,7 +113,9 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
else if (rows_count) else if (rows_count)
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR); + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
}
{
WriteBufferFromFile count_out(part_path + "count.txt", 4096); WriteBufferFromFile count_out(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(count_out); HashingWriteBuffer count_out_hashing(count_out);
writeIntText(rows_count, count_out_hashing); writeIntText(rows_count, count_out_hashing);
@ -132,22 +134,6 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
checksums.files["ttl.txt"].file_hash = out_hashing.getHash(); checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
} }
const auto & columns_sizes = writer->getColumnsSizes();
if (!columns_sizes.empty())
{
WriteBufferFromFile out(part_path + "columns_sizes.txt", 4096);
HashingWriteBuffer out_hashing(out);
for (const auto & column : columns_list)
{
auto it = columns_sizes.find(column.name);
if (it == columns_sizes.end())
throw Exception("Not found size for column " + column.name, ErrorCodes::LOGICAL_ERROR);
writePODBinary(it->second, out_hashing);
checksums.files["columns_sizes.txt"].file_size = out_hashing.count();
checksums.files["columns_sizes.txt"].file_hash = out_hashing.getHash();
}
}
{ {
/// Write a file with a description of columns. /// Write a file with a description of columns.
WriteBufferFromFile out(part_path + "columns.txt", 4096); WriteBufferFromFile out(part_path + "columns.txt", 4096);
@ -166,7 +152,6 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->checksums = checksums; new_part->checksums = checksums;
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk(); new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
new_part->index_granularity = writer->getIndexGranularity(); new_part->index_granularity = writer->getIndexGranularity();
new_part->columns_sizes = columns_sizes;
} }
void MergedBlockOutputStream::init() void MergedBlockOutputStream::init()