Merge pull request #10329 from CurtizJ/polymorphic-parts-2

Remove unused flags from MergedColumnOnlyOutputStream.
This commit is contained in:
alexey-milovidov 2020-04-17 21:25:21 +03:00 committed by GitHub
commit 375f0f9e61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 23 additions and 47 deletions

View File

@ -107,7 +107,7 @@ public:
void initSkipIndices();
void initPrimaryIndex();
virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync = false) = 0;
virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) = 0;
void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums);
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums);

View File

@ -25,7 +25,7 @@ public:
protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns);
/// Remove all columns marked expired in data_part. Also, clears checksums
/// and columns array. Return set of removed files names.

View File

@ -876,9 +876,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergedColumnOnlyOutputStream column_to(
new_data_part,
column_gathered_stream.getHeader(),
false,
compression_codec,
false,
/// we don't need to recalc indices here
/// because all of them were already recalculated and written
/// as key part of vertical merge
@ -1588,9 +1586,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
MergedColumnOnlyOutputStream out(
new_data_part,
mutation_header,
/* sync = */ false,
compression_codec,
/* skip_offsets = */ false,
std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()),
nullptr,
source_part->index_granularity,

View File

@ -20,7 +20,6 @@ class MergeTreeDataPartCompact : public IMergeTreeDataPart
public:
static constexpr auto DATA_FILE_NAME = "data";
static constexpr auto DATA_FILE_EXTENSION = ".bin";
static constexpr auto TEMP_FILE_SUFFIX = "_temp";
static constexpr auto DATA_FILE_NAME_WITH_EXTENSION = "data.bin";
MergeTreeDataPartCompact(

View File

@ -22,8 +22,6 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
{
using DataPart = MergeTreeDataPartCompact;
String data_file_name = DataPart::DATA_FILE_NAME;
if (settings.is_writing_temp_files)
data_file_name += DataPart::TEMP_FILE_SUFFIX;
stream = std::make_unique<Stream>(
data_file_name,
@ -145,7 +143,7 @@ void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTy
column.type->serializeBinaryBulkStateSuffix(serialize_settings, state);
}
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync)
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums)
{
if (columns_buffer.size() != 0)
writeBlock(header.cloneWithColumns(columns_buffer.releaseColumns()));
@ -161,8 +159,6 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
}
stream->finalize();
if (sync)
stream->sync();
stream->addToChecksums(checksums);
stream.reset();
}

View File

@ -21,7 +21,7 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation,
const Block & primary_key_block, const Block & skip_indexes_block) override;
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) override;
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) override;
private:
/// Write single granule of one column (rows between 2 marks)

View File

@ -39,9 +39,6 @@ void MergeTreeDataPartWriterWide::addStreams(
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
{
if (settings.skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Shared offsets for Nested type.
if (column_streams.count(stream_name))
@ -69,8 +66,6 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
return [&, this] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && settings.skip_offsets)
return nullptr;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
@ -135,8 +130,6 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && settings.skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
@ -177,8 +170,6 @@ size_t MergeTreeDataPartWriterWide::writeSingleGranule(
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && settings.skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
@ -270,7 +261,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
next_index_offset = current_row - total_rows;
}
void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync)
void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums)
{
const auto & global_settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
@ -300,8 +291,6 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
for (auto & stream : column_streams)
{
stream.second->finalize();
if (sync)
stream.second->sync();
stream.second->addToChecksums(checksums);
}

View File

@ -24,7 +24,7 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation,
const Block & primary_key_block, const Block & skip_indexes_block) override;
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) override;
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) override;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns);

View File

@ -30,10 +30,7 @@ struct MergeTreeWriterSettings
size_t aio_threshold;
bool can_use_adaptive_granularity;
bool blocks_are_granules_size;
/// true if we write temporary files during alter.
bool is_writing_temp_files = false;
size_t estimated_size = 0;
/// used when ALTERing columns if we know that array offsets are not altered.
bool skip_offsets = false;
};
}

View File

@ -8,15 +8,14 @@ namespace ErrorCodes
}
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const MergeTreeDataPartPtr & data_part, const Block & header_, bool sync_,
CompressionCodecPtr default_codec, bool skip_offsets_,
const MergeTreeDataPartPtr & data_part,
const Block & header_,
CompressionCodecPtr default_codec,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
WrittenOffsetColumns * offset_columns_,
const MergeTreeIndexGranularity & index_granularity,
const MergeTreeIndexGranularityInfo * index_granularity_info,
bool is_writing_temp_files)
: IMergedBlockOutputStream(data_part),
header(header_), sync(sync_)
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part), header(header_)
{
const auto & global_settings = data_part->storage.global_context.getSettings();
MergeTreeWriterSettings writer_settings(
@ -24,11 +23,13 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(),
global_settings.min_bytes_to_use_direct_io);
writer_settings.is_writing_temp_files = is_writing_temp_files;
writer_settings.skip_offsets = skip_offsets_;
writer = data_part->getWriter(
header.getNamesAndTypesList(),
indices_to_recalc,
default_codec,
std::move(writer_settings),
index_granularity);
writer = data_part->getWriter(header.getNamesAndTypesList(), indices_to_recalc,
default_codec,std::move(writer_settings), index_granularity);
writer->setWrittenOffsetColumns(offset_columns_);
writer->initSkipIndices();
}
@ -62,7 +63,7 @@ MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums(MergeTreeData::MutableD
{
/// Finish columns serialization.
MergeTreeData::DataPart::Checksums checksums;
writer->finishDataSerialization(checksums, sync);
writer->finishDataSerialization(checksums);
writer->finishSkipIndicesSerialization(checksums);
auto columns = new_part->getColumns();

View File

@ -11,17 +11,16 @@ class MergeTreeDataPartWriterWide;
class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream
{
public:
/// skip_offsets: used when ALTERing columns if we know that array offsets are not altered.
/// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream
/// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream(
const MergeTreeDataPartPtr & data_part, const Block & header_, bool sync_,
CompressionCodecPtr default_codec_, bool skip_offsets_,
const MergeTreeDataPartPtr & data_part,
const Block & header_,
CompressionCodecPtr default_codec_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
WrittenOffsetColumns * offset_columns_ = nullptr,
const MergeTreeIndexGranularity & index_granularity = {},
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr,
bool is_writing_temp_files = false);
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);
Block getHeader() const override { return header; }
void write(const Block & block) override;
@ -31,7 +30,6 @@ public:
private:
Block header;
bool sync;
};