CLICKHOUSE-3971: Write data with adaptive index granularity

This commit is contained in:
alesapin 2018-11-12 19:14:37 +03:00
parent 5f3dcf794a
commit 6f5fef5344
7 changed files with 64 additions and 20 deletions

View File

@ -100,6 +100,7 @@ MergeTreeData::MergeTreeData(
context(context_),
sampling_expression(sampling_expression_),
index_granularity(settings_.index_granularity),
index_granularity_bytes(settings_.index_granularity_bytes),
merging_params(merging_params_),
settings(settings_),
primary_expr_ast(primary_expr_ast_),
@ -113,6 +114,7 @@ MergeTreeData::MergeTreeData(
data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
{
std::cerr << "LOADING PART\n";
/// NOTE: using the same columns list as is read when performing actual merges.
merging_params.check(getColumns().getAllPhysical());
@ -121,11 +123,11 @@ MergeTreeData::MergeTreeData(
initPrimaryKey();
size_t min_format_version(0);
if (sampling_expression && (!primary_key_sample.has(sampling_expression->getColumnName()))
&& !attach && !settings.compatibility_allow_sampling_expression_not_in_primary_key) /// This is for backward compatibility.
throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
MergeTreeDataFormatVersion min_format_version(0);
if (!date_column_name.empty())
{
try
@ -146,11 +148,15 @@ MergeTreeData::MergeTreeData(
e.addMessage("(while initializing MergeTree partition key from date column `" + date_column_name + "`)");
throw;
}
format_version = 0;
}
else
{
initPartitionKey();
if (settings_.index_granularity_bytes != 0)
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY;
else
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
initPartitionKey();
}
auto path_exists = Poco::File(full_path).exists();
@ -179,9 +185,19 @@ MergeTreeData::MergeTreeData(
format_version = 0;
if (format_version < min_format_version)
{
if (min_format_version == MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING.toUnderType())
throw Exception(
"MergeTree data format version on disk doesn't support custom partitioning",
ErrorCodes::METADATA_MISMATCH);
else if (min_format_version == MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY.toUnderType())
throw Exception(
"MergeTree data format version on disk doesn't support adaptive index granularity",
ErrorCodes::METADATA_MISMATCH);
}
}

View File

@ -542,6 +542,7 @@ public:
Context & context;
const ASTPtr sampling_expression;
const size_t index_granularity;
const size_t index_granularity_bytes;
/// Merging params - what additional actions to perform during merge.
const MergingParams merging_params;

View File

@ -10,4 +10,6 @@ STRONG_TYPEDEF(UInt32, MergeTreeDataFormatVersion)
const MergeTreeDataFormatVersion MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING {1};
const MergeTreeDataFormatVersion MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY {2};
}

View File

@ -526,6 +526,7 @@ void MergeTreeDataPart::loadChecksums(bool require)
assertEOF(file);
}
/// TODO alesap
void MergeTreeDataPart::loadRowsCount()
{
if (marks_count == 0)

View File

@ -149,7 +149,10 @@ struct MergeTreeSettings
M(SettingUInt64, finished_mutations_to_keep, 100) \
\
/** Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled) */ \
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024)
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024) \
\
/** Approximate amount of bytes in single granule (0 - disabled) */ \
M(SettingUInt64, index_granularity_bytes, 0)
/// Settings that should not change after the creation of a table.
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \

View File

@ -15,7 +15,8 @@ namespace
{
constexpr auto DATA_FILE_EXTENSION = ".bin";
constexpr auto MARKS_FILE_EXTENSION = ".mrk";
constexpr auto MARKS_FILE_EXTENSION_WITH_FIXED_GRANULARITY = ".mrk";
constexpr auto MARKS_FILE_EXTENSION_WITH_ADAPTIVE_INDEX_GRANULARITY = ".mrk2";
}
@ -55,10 +56,14 @@ void IMergedBlockOutputStream::addStreams(
if (column_streams.count(stream_name))
return;
std::string marks_file_extension = MARKS_FILE_EXTENSION_WITH_FIXED_GRANULARITY;
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY)
marks_file_extension = MARKS_FILE_EXTENSION_WITH_ADAPTIVE_INDEX_GRANULARITY;
column_streams[stream_name] = std::make_unique<ColumnStream>(
stream_name,
path + stream_name, DATA_FILE_EXTENSION,
path + stream_name, MARKS_FILE_EXTENSION,
path + stream_name, marks_file_extension,
max_compress_block_size,
compression_settings,
estimated_size,
@ -89,6 +94,15 @@ IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
};
}
size_t IMergedBlockOutputStream::getBlockIndexGranularity(const Block & block) const
{
if (storage.index_granularity_bytes == 0 || storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY)
return storage.index_granularity;
size_t blockSize = block.allocatedBytes();
return std::max(blockSize / storage.index_granularity_bytes, 1);
}
void IMergedBlockOutputStream::writeData(
const String & name,
@ -96,7 +110,8 @@ void IMergedBlockOutputStream::writeData(
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state)
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t index_granularity)
{
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
@ -115,7 +130,7 @@ void IMergedBlockOutputStream::writeData(
limit = index_offset;
else
{
limit = storage.index_granularity;
limit = index_granularity;
/// Write marks.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
@ -138,6 +153,8 @@ void IMergedBlockOutputStream::writeData(
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY)
writeIntBinary(index_granularity, stream.marks);
}, serialize_settings.path);
}
@ -394,6 +411,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
{
block.checkNumberOfRows();
size_t rows = block.rows();
size_t current_block_index_granularity = getBlockIndexGranularity(block);
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
WrittenOffsetColumns offset_columns;
@ -451,18 +469,18 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
if (primary_columns_name_to_position.end() != primary_column_it)
{
auto & primary_column = *primary_columns[primary_column_it->second].column;
writeData(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i]);
writeData(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i], current_block_index_granularity);
}
else
{
/// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
ColumnPtr permuted_column = column.column->permute(*permutation, 0);
writeData(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i]);
writeData(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i], current_block_index_granularity);
}
}
else
{
writeData(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i]);
writeData(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i], current_block_index_granularity);
}
}
@ -478,7 +496,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (size_t i = index_offset; i < rows; i += storage.index_granularity)
for (size_t i = index_offset; i < rows; i += current_block_index_granularity)
{
if (storage.hasPrimaryKey())
{
@ -494,8 +512,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity;
index_offset = (storage.index_granularity - written_for_last_mark) % storage.index_granularity;
size_t written_for_last_mark = (current_block_index_granularity - index_offset + rows) % current_block_index_granularity;
index_offset = (current_block_index_granularity - written_for_last_mark) % current_block_index_granularity;
}
@ -537,17 +555,18 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
initialized = true;
}
size_t current_block_index_granularity = getBlockIndexGranularity(block);
size_t rows = block.rows();
WrittenOffsetColumns offset_columns = already_written_offset_columns;
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i]);
writeData(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_block_index_granularity);
}
size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity;
index_offset = (storage.index_granularity - written_for_last_mark) % storage.index_granularity;
size_t written_for_last_mark = (current_block_index_granularity - index_offset + rows) % current_block_index_granularity;
index_offset = (current_block_index_granularity - written_for_last_mark) % current_block_index_granularity;
}
void MergedColumnOnlyOutputStream::writeSuffix()

View File

@ -72,7 +72,9 @@ protected:
/// Write data of one column.
void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenOffsetColumns & offset_columns,
bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state);
bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state, size_t index_granularity);
size_t getBlockIndexGranularity(const Block & block) const;
MergeTreeData & storage;