remove unused flags

This commit is contained in:
Anton Popov 2020-04-17 14:59:10 +03:00
parent 1c3458dbae
commit f7b7b5eb74
11 changed files with 23 additions and 47 deletions

View File

@ -107,7 +107,7 @@ public:
void initSkipIndices(); void initSkipIndices();
void initPrimaryIndex(); void initPrimaryIndex();
virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync = false) = 0; virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) = 0;
void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums); void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums);
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums); void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums);

View File

@ -25,7 +25,7 @@ public:
protected: protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr; using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets); IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns);
/// Remove all columns marked expired in data_part. Also, clears checksums /// Remove all columns marked expired in data_part. Also, clears checksums
/// and columns array. Return set of removed files names. /// and columns array. Return set of removed files names.

View File

@ -876,9 +876,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergedColumnOnlyOutputStream column_to( MergedColumnOnlyOutputStream column_to(
new_data_part, new_data_part,
column_gathered_stream.getHeader(), column_gathered_stream.getHeader(),
false,
compression_codec, compression_codec,
false,
/// we don't need to recalc indices here /// we don't need to recalc indices here
/// because all of them were already recalculated and written /// because all of them were already recalculated and written
/// as key part of vertical merge /// as key part of vertical merge
@ -1588,9 +1586,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
MergedColumnOnlyOutputStream out( MergedColumnOnlyOutputStream out(
new_data_part, new_data_part,
mutation_header, mutation_header,
/* sync = */ false,
compression_codec, compression_codec,
/* skip_offsets = */ false,
std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()), std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()),
nullptr, nullptr,
source_part->index_granularity, source_part->index_granularity,

View File

@ -20,7 +20,6 @@ class MergeTreeDataPartCompact : public IMergeTreeDataPart
public: public:
static constexpr auto DATA_FILE_NAME = "data"; static constexpr auto DATA_FILE_NAME = "data";
static constexpr auto DATA_FILE_EXTENSION = ".bin"; static constexpr auto DATA_FILE_EXTENSION = ".bin";
static constexpr auto TEMP_FILE_SUFFIX = "_temp";
static constexpr auto DATA_FILE_NAME_WITH_EXTENSION = "data.bin"; static constexpr auto DATA_FILE_NAME_WITH_EXTENSION = "data.bin";
MergeTreeDataPartCompact( MergeTreeDataPartCompact(

View File

@ -22,8 +22,6 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
{ {
using DataPart = MergeTreeDataPartCompact; using DataPart = MergeTreeDataPartCompact;
String data_file_name = DataPart::DATA_FILE_NAME; String data_file_name = DataPart::DATA_FILE_NAME;
if (settings.is_writing_temp_files)
data_file_name += DataPart::TEMP_FILE_SUFFIX;
stream = std::make_unique<Stream>( stream = std::make_unique<Stream>(
data_file_name, data_file_name,
@ -145,7 +143,7 @@ void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTy
column.type->serializeBinaryBulkStateSuffix(serialize_settings, state); column.type->serializeBinaryBulkStateSuffix(serialize_settings, state);
} }
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums)
{ {
if (columns_buffer.size() != 0) if (columns_buffer.size() != 0)
writeBlock(header.cloneWithColumns(columns_buffer.releaseColumns())); writeBlock(header.cloneWithColumns(columns_buffer.releaseColumns()));
@ -161,8 +159,6 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
} }
stream->finalize(); stream->finalize();
if (sync)
stream->sync();
stream->addToChecksums(checksums); stream->addToChecksums(checksums);
stream.reset(); stream.reset();
} }

View File

@ -21,7 +21,7 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation, void write(const Block & block, const IColumn::Permutation * permutation,
const Block & primary_key_block, const Block & skip_indexes_block) override; const Block & primary_key_block, const Block & skip_indexes_block) override;
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) override; void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) override;
private: private:
/// Write single granule of one column (rows between 2 marks) /// Write single granule of one column (rows between 2 marks)

View File

@ -39,9 +39,6 @@ void MergeTreeDataPartWriterWide::addStreams(
{ {
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path) IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
{ {
if (settings.skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path); String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Shared offsets for Nested type. /// Shared offsets for Nested type.
if (column_streams.count(stream_name)) if (column_streams.count(stream_name))
@ -69,8 +66,6 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
return [&, this] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer * return [&, this] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{ {
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && settings.skip_offsets)
return nullptr;
String stream_name = IDataType::getFileNameForStream(name, substream_path); String stream_name = IDataType::getFileNameForStream(name, substream_path);
@ -135,8 +130,6 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{ {
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && settings.skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path); String stream_name = IDataType::getFileNameForStream(name, substream_path);
@ -177,8 +170,6 @@ size_t MergeTreeDataPartWriterWide::writeSingleGranule(
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{ {
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && settings.skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path); String stream_name = IDataType::getFileNameForStream(name, substream_path);
@ -270,7 +261,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
next_index_offset = current_row - total_rows; next_index_offset = current_row - total_rows;
} }
void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums)
{ {
const auto & global_settings = storage.global_context.getSettingsRef(); const auto & global_settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings; IDataType::SerializeBinaryBulkSettings serialize_settings;
@ -300,8 +291,6 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
for (auto & stream : column_streams) for (auto & stream : column_streams)
{ {
stream.second->finalize(); stream.second->finalize();
if (sync)
stream.second->sync();
stream.second->addToChecksums(checksums); stream.second->addToChecksums(checksums);
} }

View File

@ -24,7 +24,7 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation, void write(const Block & block, const IColumn::Permutation * permutation,
const Block & primary_key_block, const Block & skip_indexes_block) override; const Block & primary_key_block, const Block & skip_indexes_block) override;
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) override; void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) override;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns); IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns);

View File

@ -30,10 +30,7 @@ struct MergeTreeWriterSettings
size_t aio_threshold; size_t aio_threshold;
bool can_use_adaptive_granularity; bool can_use_adaptive_granularity;
bool blocks_are_granules_size; bool blocks_are_granules_size;
/// true if we write temporary files during alter.
bool is_writing_temp_files = false;
size_t estimated_size = 0; size_t estimated_size = 0;
/// used when ALTERing columns if we know that array offsets are not altered.
bool skip_offsets = false;
}; };
} }

View File

@ -8,15 +8,14 @@ namespace ErrorCodes
} }
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const MergeTreeDataPartPtr & data_part, const Block & header_, bool sync_, const MergeTreeDataPartPtr & data_part,
CompressionCodecPtr default_codec, bool skip_offsets_, const Block & header_,
CompressionCodecPtr default_codec,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc, const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
WrittenOffsetColumns * offset_columns_, WrittenOffsetColumns * offset_columns_,
const MergeTreeIndexGranularity & index_granularity, const MergeTreeIndexGranularity & index_granularity,
const MergeTreeIndexGranularityInfo * index_granularity_info, const MergeTreeIndexGranularityInfo * index_granularity_info)
bool is_writing_temp_files) : IMergedBlockOutputStream(data_part), header(header_)
: IMergedBlockOutputStream(data_part),
header(header_), sync(sync_)
{ {
const auto & global_settings = data_part->storage.global_context.getSettings(); const auto & global_settings = data_part->storage.global_context.getSettings();
MergeTreeWriterSettings writer_settings( MergeTreeWriterSettings writer_settings(
@ -24,11 +23,13 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(), index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(),
global_settings.min_bytes_to_use_direct_io); global_settings.min_bytes_to_use_direct_io);
writer_settings.is_writing_temp_files = is_writing_temp_files; writer = data_part->getWriter(
writer_settings.skip_offsets = skip_offsets_; header.getNamesAndTypesList(),
indices_to_recalc,
default_codec,
std::move(writer_settings),
index_granularity);
writer = data_part->getWriter(header.getNamesAndTypesList(), indices_to_recalc,
default_codec,std::move(writer_settings), index_granularity);
writer->setWrittenOffsetColumns(offset_columns_); writer->setWrittenOffsetColumns(offset_columns_);
writer->initSkipIndices(); writer->initSkipIndices();
} }
@ -62,7 +63,7 @@ MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums(MergeTreeData::MutableD
{ {
/// Finish columns serialization. /// Finish columns serialization.
MergeTreeData::DataPart::Checksums checksums; MergeTreeData::DataPart::Checksums checksums;
writer->finishDataSerialization(checksums, sync); writer->finishDataSerialization(checksums);
writer->finishSkipIndicesSerialization(checksums); writer->finishSkipIndicesSerialization(checksums);
auto columns = new_part->getColumns(); auto columns = new_part->getColumns();

View File

@ -11,17 +11,16 @@ class MergeTreeDataPartWriterWide;
class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream
{ {
public: public:
/// skip_offsets: used when ALTERing columns if we know that array offsets are not altered.
/// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream /// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream
/// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream. /// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream( MergedColumnOnlyOutputStream(
const MergeTreeDataPartPtr & data_part, const Block & header_, bool sync_, const MergeTreeDataPartPtr & data_part,
CompressionCodecPtr default_codec_, bool skip_offsets_, const Block & header_,
CompressionCodecPtr default_codec_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_, const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
WrittenOffsetColumns * offset_columns_ = nullptr, WrittenOffsetColumns * offset_columns_ = nullptr,
const MergeTreeIndexGranularity & index_granularity = {}, const MergeTreeIndexGranularity & index_granularity = {},
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr, const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);
bool is_writing_temp_files = false);
Block getHeader() const override { return header; } Block getHeader() const override { return header; }
void write(const Block & block) override; void write(const Block & block) override;
@ -31,7 +30,6 @@ public:
private: private:
Block header; Block header;
bool sync;
}; };