From 6f5fef5344603be7e2b3706a74ab3646679f9ba2 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 12 Nov 2018 19:14:37 +0300 Subject: [PATCH] CLICKHOUSE-3971: Write data with adaptive index granularity --- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 26 ++++++++--- dbms/src/Storages/MergeTree/MergeTreeData.h | 1 + .../MergeTree/MergeTreeDataFormatVersion.h | 2 + .../Storages/MergeTree/MergeTreeDataPart.cpp | 1 + .../Storages/MergeTree/MergeTreeSettings.h | 5 ++- .../MergeTree/MergedBlockOutputStream.cpp | 45 +++++++++++++------ .../MergeTree/MergedBlockOutputStream.h | 4 +- 7 files changed, 64 insertions(+), 20 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 524b8bfe8bf..36a3d89fbc4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -100,6 +100,7 @@ MergeTreeData::MergeTreeData( context(context_), sampling_expression(sampling_expression_), index_granularity(settings_.index_granularity), + index_granularity_bytes(settings_.index_granularity_bytes), merging_params(merging_params_), settings(settings_), primary_expr_ast(primary_expr_ast_), @@ -113,6 +114,7 @@ MergeTreeData::MergeTreeData( data_parts_by_info(data_parts_indexes.get()), data_parts_by_state_and_info(data_parts_indexes.get()) { + std::cerr << "LOADING PART\n"; /// NOTE: using the same columns list as is read when performing actual merges. merging_params.check(getColumns().getAllPhysical()); @@ -121,11 +123,11 @@ MergeTreeData::MergeTreeData( initPrimaryKey(); + size_t min_format_version(0); if (sampling_expression && (!primary_key_sample.has(sampling_expression->getColumnName())) && !attach && !settings.compatibility_allow_sampling_expression_not_in_primary_key) /// This is for backward compatibility. throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS); - MergeTreeDataFormatVersion min_format_version(0); if (!date_column_name.empty()) { try @@ -146,11 +148,15 @@ MergeTreeData::MergeTreeData( e.addMessage("(while initializing MergeTree partition key from date column `" + date_column_name + "`)"); throw; } + format_version = 0; } else { + if (settings_.index_granularity_bytes != 0) + min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY; + else + min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING; initPartitionKey(); - min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING; } auto path_exists = Poco::File(full_path).exists(); @@ -179,9 +185,19 @@ MergeTreeData::MergeTreeData( format_version = 0; if (format_version < min_format_version) - throw Exception( - "MergeTree data format version on disk doesn't support custom partitioning", - ErrorCodes::METADATA_MISMATCH); + { + if (min_format_version == MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING.toUnderType()) + throw Exception( + "MergeTree data format version on disk doesn't support custom partitioning", + ErrorCodes::METADATA_MISMATCH); + else if (min_format_version == MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY.toUnderType()) + throw Exception( + "MergeTree data format version on disk doesn't support adaptive index granularity", + ErrorCodes::METADATA_MISMATCH); + + + } + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 5ad413f21f8..31d1581da01 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -542,6 +542,7 @@ public: Context & context; const ASTPtr sampling_expression; const size_t index_granularity; + const size_t index_granularity_bytes; /// Merging params - what additional actions to perform during merge. const MergingParams merging_params; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataFormatVersion.h b/dbms/src/Storages/MergeTree/MergeTreeDataFormatVersion.h index 4b492a9fb61..02e16ace868 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataFormatVersion.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataFormatVersion.h @@ -10,4 +10,6 @@ STRONG_TYPEDEF(UInt32, MergeTreeDataFormatVersion) const MergeTreeDataFormatVersion MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING {1}; +const MergeTreeDataFormatVersion MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY {2}; + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index 1c3d21d4653..5aa32e1f584 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -526,6 +526,7 @@ void MergeTreeDataPart::loadChecksums(bool require) assertEOF(file); } +/// TODO alesap void MergeTreeDataPart::loadRowsCount() { if (marks_count == 0) diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index f7fa9bf6703..c8040104ef1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -149,7 +149,10 @@ struct MergeTreeSettings M(SettingUInt64, finished_mutations_to_keep, 100) \ \ /** Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled) */ \ - M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024) + M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024) \ + \ + /** Approximate amount of bytes in single granule (0 - disabled) */ \ + M(SettingUInt64, index_granularity_bytes, 0) /// Settings that should not change after the creation of a table. #define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \ diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index c12ddc51381..e13e04d92dc 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -15,7 +15,8 @@ namespace { constexpr auto DATA_FILE_EXTENSION = ".bin"; -constexpr auto MARKS_FILE_EXTENSION = ".mrk"; +constexpr auto MARKS_FILE_EXTENSION_WITH_FIXED_GRANULARITY = ".mrk"; +constexpr auto MARKS_FILE_EXTENSION_WITH_ADAPTIVE_INDEX_GRANULARITY = ".mrk2"; } @@ -55,10 +56,14 @@ void IMergedBlockOutputStream::addStreams( if (column_streams.count(stream_name)) return; + std::string marks_file_extension = MARKS_FILE_EXTENSION_WITH_FIXED_GRANULARITY; + if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY) + marks_file_extension = MARKS_FILE_EXTENSION_WITH_ADAPTIVE_INDEX_GRANULARITY; + column_streams[stream_name] = std::make_unique( stream_name, path + stream_name, DATA_FILE_EXTENSION, - path + stream_name, MARKS_FILE_EXTENSION, + path + stream_name, marks_file_extension, max_compress_block_size, compression_settings, estimated_size, @@ -89,6 +94,15 @@ IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter( }; } +size_t IMergedBlockOutputStream::getBlockIndexGranularity(const Block & block) const +{ + if (storage.index_granularity_bytes == 0 || storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY) + return storage.index_granularity; + + size_t blockSize = block.allocatedBytes(); + return std::max(blockSize / storage.index_granularity_bytes, 1); +} + void IMergedBlockOutputStream::writeData( const String & name, @@ -96,7 +110,8 @@ void IMergedBlockOutputStream::writeData( const IColumn & column, WrittenOffsetColumns & offset_columns, bool skip_offsets, - IDataType::SerializeBinaryBulkStatePtr & serialization_state) + IDataType::SerializeBinaryBulkStatePtr & serialization_state, + size_t index_granularity) { auto & settings = storage.context.getSettingsRef(); IDataType::SerializeBinaryBulkSettings serialize_settings; @@ -115,7 +130,7 @@ void IMergedBlockOutputStream::writeData( limit = index_offset; else { - limit = storage.index_granularity; + limit = index_granularity; /// Write marks. type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) @@ -138,6 +153,8 @@ void IMergedBlockOutputStream::writeData( writeIntBinary(stream.plain_hashing.count(), stream.marks); writeIntBinary(stream.compressed.offset(), stream.marks); + if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY) + writeIntBinary(index_granularity, stream.marks); }, serialize_settings.path); } @@ -394,6 +411,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm { block.checkNumberOfRows(); size_t rows = block.rows(); + size_t current_block_index_granularity = getBlockIndexGranularity(block); /// The set of written offset columns so that you do not write shared offsets of nested structures columns several times WrittenOffsetColumns offset_columns; @@ -451,18 +469,18 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm if (primary_columns_name_to_position.end() != primary_column_it) { auto & primary_column = *primary_columns[primary_column_it->second].column; - writeData(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i]); + writeData(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i], current_block_index_granularity); } else { /// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM. ColumnPtr permuted_column = column.column->permute(*permutation, 0); - writeData(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i]); + writeData(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i], current_block_index_granularity); } } else { - writeData(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i]); + writeData(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i], current_block_index_granularity); } } @@ -478,7 +496,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); /// Write index. The index contains Primary Key value for each `index_granularity` row. - for (size_t i = index_offset; i < rows; i += storage.index_granularity) + for (size_t i = index_offset; i < rows; i += current_block_index_granularity) { if (storage.hasPrimaryKey()) { @@ -494,8 +512,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm } } - size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity; - index_offset = (storage.index_granularity - written_for_last_mark) % storage.index_granularity; + size_t written_for_last_mark = (current_block_index_granularity - index_offset + rows) % current_block_index_granularity; + index_offset = (current_block_index_granularity - written_for_last_mark) % current_block_index_granularity; } @@ -537,17 +555,18 @@ void MergedColumnOnlyOutputStream::write(const Block & block) initialized = true; } + size_t current_block_index_granularity = getBlockIndexGranularity(block); size_t rows = block.rows(); WrittenOffsetColumns offset_columns = already_written_offset_columns; for (size_t i = 0; i < block.columns(); ++i) { const ColumnWithTypeAndName & column = block.safeGetByPosition(i); - writeData(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i]); + writeData(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_block_index_granularity); } - size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity; - index_offset = (storage.index_granularity - written_for_last_mark) % storage.index_granularity; + size_t written_for_last_mark = (current_block_index_granularity - index_offset + rows) % current_block_index_granularity; + index_offset = (current_block_index_granularity - written_for_last_mark) % current_block_index_granularity; } void MergedColumnOnlyOutputStream::writeSuffix() diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h index a3f6a025c31..608fa570b3b 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -72,7 +72,9 @@ protected: /// Write data of one column. void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenOffsetColumns & offset_columns, - bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state); + bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state, size_t index_granularity); + + size_t getBlockIndexGranularity(const Block & block) const; MergeTreeData & storage;