Mechanical refactoring

This commit is contained in:
alesapin 2020-12-09 21:10:09 +03:00
parent 7b88f9a115
commit af73db93e6
10 changed files with 73 additions and 94 deletions

View File

@ -48,9 +48,6 @@ public:
/// calling calculations of primary and skip indices.
void next();
virtual void initSkipIndices() {}
virtual void initPrimaryIndex() {}
virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) = 0;
virtual void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & /* checksums */, bool /* sync */) {}
virtual void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & /* checksums */, bool /* sync */) {}
@ -63,17 +60,14 @@ protected:
size_t getCurrentMark() const { return current_mark; }
size_t getIndexOffset() const { return index_offset; }
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::unordered_map<String, SerializationState>;
MergeTreeData::DataPartPtr data_part;
const MergeTreeData::DataPartPtr data_part;
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
NamesAndTypesList columns_list;
MergeTreeIndices skip_indices;
const StorageMetadataPtr metadata_snapshot;
const NamesAndTypesList columns_list;
const MergeTreeIndices skip_indices;
MergeTreeIndexGranularity index_granularity;
MergeTreeWriterSettings settings;
bool with_final_mark;
const MergeTreeWriterSettings settings;
const bool with_final_mark;
size_t next_mark = 0;
size_t next_index_offset = 0;

View File

@ -118,6 +118,30 @@ void MergeTreeDataPartWriterCompact::write(
writeBlock(header.cloneWithColumns(columns_buffer.releaseColumns()));
}
namespace
{
/// Write single granule of one column (rows between 2 marks)
void writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
IDataType::OutputStreamGetter stream_getter,
size_t from_row,
size_t number_of_rows)
{
IDataType::SerializeBinaryBulkStatePtr state;
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = stream_getter;
serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0;
column.type->serializeBinaryBulkStatePrefix(serialize_settings, state);
column.type->serializeBinaryBulkWithMultipleStreams(*column.column, from_row, number_of_rows, serialize_settings, state);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, state);
}
}
void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
{
size_t total_rows = block.rows();
@ -186,24 +210,6 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
next_mark = from_mark;
}
void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
IDataType::OutputStreamGetter stream_getter,
size_t from_row,
size_t number_of_rows)
{
IDataType::SerializeBinaryBulkStatePtr state;
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = stream_getter;
serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0;
column.type->serializeBinaryBulkStatePrefix(serialize_settings, state);
column.type->serializeBinaryBulkWithMultipleStreams(*column.column, from_row, number_of_rows, serialize_settings, state);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, state);
}
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync)
{
if (columns_buffer.size() != 0)

View File

@ -23,10 +23,9 @@ public:
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) override;
protected:
private:
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
private:
void writeBlock(const Block & block);
void addToChecksums(MergeTreeDataPartChecksums & checksums);
@ -78,13 +77,6 @@ private:
/// marks -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file;
HashingWriteBuffer marks;
/// Write single granule of one column (rows between 2 marks)
static void writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
IDataType::OutputStreamGetter stream_getter,
size_t from_row,
size_t number_of_rows);
};
}

View File

@ -89,6 +89,9 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
auto disk = data_part->volume->getDisk();
if (!disk->exists(part_path))
disk->createDirectories(part_path);
initSkipIndices();
initPrimaryIndex();
}
// Implementation is split into static functions for ability
@ -129,17 +132,7 @@ static size_t computeIndexGranularityImpl(
return index_granularity_for_block;
}
static void fillIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity,
size_t index_offset,
size_t index_granularity_for_block,
size_t rows_in_block)
{
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
index_granularity.appendMark(index_granularity_for_block);
}
size_t MergeTreeDataPartWriterOnDisk::computeIndexGranularity(const Block & block)
size_t MergeTreeDataPartWriterOnDisk::computeIndexGranularity(const Block & block) const
{
const auto storage_settings = storage.getSettings();
return computeIndexGranularityImpl(
@ -150,15 +143,6 @@ size_t MergeTreeDataPartWriterOnDisk::computeIndexGranularity(const Block & bloc
settings.can_use_adaptive_granularity);
}
void MergeTreeDataPartWriterOnDisk::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block)
{
fillIndexGranularityImpl(
index_granularity,
getIndexOffset(),
index_granularity_for_block,
rows_in_block);
}
void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
{
if (metadata_snapshot->hasPrimaryKey())
@ -166,8 +150,6 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
index_file_stream = data_part->volume->getDisk()->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
primary_index_initialized = true;
}
void MergeTreeDataPartWriterOnDisk::initSkipIndices()
@ -186,15 +168,10 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
skip_index_filling.push_back(0);
}
skip_indices_initialized = true;
}
void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Block & primary_index_block)
{
if (!primary_index_initialized)
throw Exception("Primary index is not initialized", ErrorCodes::LOGICAL_ERROR);
size_t rows = primary_index_block.rows();
size_t primary_columns_num = primary_index_block.columns();
if (index_columns.empty())
@ -241,9 +218,6 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block & skip_indexes_block)
{
if (!skip_indices_initialized)
throw Exception("Skip indices are not initialized", ErrorCodes::LOGICAL_ERROR);
size_t rows = skip_indexes_block.rows();
size_t skip_index_current_data_mark = 0;

View File

@ -74,13 +74,6 @@ public:
void calculateAndSerializePrimaryIndex(const Block & primary_index_block) final;
void calculateAndSerializeSkipIndices(const Block & skip_indexes_block) final;
/// Count index_granularity for block and store in `index_granularity`
size_t computeIndexGranularity(const Block & block);
virtual void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block);
void initSkipIndices() final;
void initPrimaryIndex() final;
void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync) final;
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync) final;
@ -90,15 +83,14 @@ public:
}
protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::unordered_map<String, SerializationState>;
/// Count index_granularity for block and store in `index_granularity`
size_t computeIndexGranularity(const Block & block) const;
String part_path;
const String part_path;
const String marks_file_extension;
CompressionCodecPtr default_codec;
const CompressionCodecPtr default_codec;
bool compute_granularity;
bool need_finish_last_granule;
const bool compute_granularity;
/// Number of marsk in data from which skip indices have to start
/// aggregation. I.e. it's data mark number, not skip indices mark.
@ -116,13 +108,15 @@ protected:
Columns last_block_index_columns;
bool data_written = false;
bool primary_index_initialized = false;
bool skip_indices_initialized = false;
/// To correctly write Nested elements column-by-column.
WrittenOffsetColumns * written_offset_columns = nullptr;
private:
void initSkipIndices();
void initPrimaryIndex();
virtual void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) = 0;
/// Index is already serialized up to this mark.
size_t index_mark = 0;
};

View File

@ -69,7 +69,7 @@ void MergeTreeDataPartWriterWide::addStreams(
IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
const String & name, WrittenOffsetColumns & offset_columns)
const String & name, WrittenOffsetColumns & offset_columns) const
{
return [&, this] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
@ -81,7 +81,7 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
if (is_offsets && offset_columns.count(stream_name))
return nullptr;
return &column_streams[stream_name]->compressed;
return &column_streams.at(stream_name)->compressed;
};
}
@ -331,4 +331,23 @@ void MergeTreeDataPartWriterWide::writeFinalMark(
}, path);
}
static void fillIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity,
size_t index_offset,
size_t index_granularity_for_block,
size_t rows_in_block)
{
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
index_granularity.appendMark(index_granularity_for_block);
}
void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block)
{
fillIndexGranularityImpl(
index_granularity,
getIndexOffset(),
index_granularity_for_block,
rows_in_block);
}
}

View File

@ -8,9 +8,6 @@ namespace DB
class MergeTreeDataPartWriterWide : public MergeTreeDataPartWriterOnDisk
{
public:
using ColumnToSize = std::map<std::string, UInt64>;
MergeTreeDataPartWriterWide(
const MergeTreeData::DataPartPtr & data_part,
const NamesAndTypesList & columns_list,
@ -26,8 +23,6 @@ public:
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) override;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns);
private:
/// Write data of one column.
/// Return how many marks were written and
@ -70,6 +65,13 @@ private:
const ASTPtr & effective_codec_desc,
size_t estimated_size);
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const;
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::unordered_map<String, SerializationState>;
SerializationStates serialization_states;
using ColumnStreams = std::map<String, StreamPtr>;

View File

@ -66,8 +66,6 @@ MergedBlockOutputStream::MergedBlockOutputStream(
volume->getDisk()->createDirectories(part_path);
writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, default_codec, writer_settings);
writer->initPrimaryIndex();
writer->initSkipIndices();
}
/// If data is pre-sorted.

View File

@ -40,7 +40,6 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
throw Exception("MergedColumnOnlyOutputStream supports only parts stored on disk", ErrorCodes::NOT_IMPLEMENTED);
writer_on_disk->setWrittenOffsetColumns(offset_columns_);
writer_on_disk->initSkipIndices();
}
void MergedColumnOnlyOutputStream::write(const Block & block)

View File

@ -4,6 +4,7 @@
// I know that inclusion of .cpp is not good at all
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp> // NOLINT
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.cpp> // NOLINT
using namespace DB;
static Block getBlockWithSize(size_t required_size_in_bytes, size_t size_of_row_in_bytes)