ClickHouse/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h

233 lines
7.5 KiB
C++
Raw Normal View History

2013-04-24 10:31:32 +00:00
#pragma once
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <IO/WriteBufferFromFile.h>
2018-12-28 18:15:26 +00:00
#include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
2013-04-24 10:31:32 +00:00
#include <Columns/ColumnArray.h>
2013-09-15 01:10:16 +00:00
2013-04-24 10:31:32 +00:00
namespace DB
{
class IMergedBlockOutputStream : public IBlockOutputStream
2013-04-24 10:31:32 +00:00
{
public:
IMergedBlockOutputStream(
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
2018-12-21 12:17:30 +00:00
CompressionCodecPtr default_codec_,
2018-11-29 13:50:34 +00:00
size_t aio_threshold_,
2018-11-30 15:36:10 +00:00
bool blocks_are_granules_size_,
const MergeTreeIndexGranularity & index_granularity_);
2013-08-24 08:01:19 +00:00
using WrittenOffsetColumns = std::set<std::string>;
protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
struct ColumnStream
{
ColumnStream(
const String & escaped_column_name_,
const String & data_path,
const std::string & data_file_extension_,
const std::string & marks_path,
const std::string & marks_file_extension_,
2018-10-11 02:57:48 +00:00
const CompressionCodecPtr & compression_codec,
size_t max_compress_block_size,
size_t estimated_size,
size_t aio_threshold);
String escaped_column_name;
std::string data_file_extension;
std::string marks_file_extension;
2014-04-14 13:08:26 +00:00
/// compressed -> compressed_buf -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer compressed;
2014-04-14 13:08:26 +00:00
/// marks -> marks_file
WriteBufferFromFile marks_file;
HashingWriteBuffer marks;
2013-09-15 01:10:16 +00:00
void finalize();
void sync();
void addToChecksums(MergeTreeData::DataPart::Checksums & checksums);
};
using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
2018-10-11 02:57:48 +00:00
void addStreams(const String & path, const String & name, const IDataType & type,
const CompressionCodecPtr & codec, size_t estimated_size, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
2017-04-16 15:00:33 +00:00
/// Write data of one column.
2018-11-30 15:36:10 +00:00
/// Return how many marks were written and
/// how many rows were written for last mark
std::pair<size_t, size_t> writeColumn(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t from_mark
);
size_t writeSingleGranule(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
IDataType::SerializeBinaryBulkSettings & serialize_settings,
size_t from_row,
size_t number_of_rows,
bool write_marks);
void fillIndexGranularity(const Block & block);
MergeTreeData & storage;
ColumnStreams column_streams;
2017-04-16 15:00:33 +00:00
/// The offset to the first row of the block for which you want to write the index.
size_t index_offset = 0;
2014-04-08 15:29:12 +00:00
size_t min_compress_block_size;
size_t max_compress_block_size;
size_t aio_threshold;
2018-11-30 15:36:10 +00:00
size_t current_mark = 0;
2018-11-15 14:06:54 +00:00
2018-11-30 15:36:10 +00:00
const std::string marks_file_extension;
const size_t mark_size_in_bytes;
const bool blocks_are_granules_size;
MergeTreeIndexGranularity index_granularity;
2018-11-29 13:50:34 +00:00
2018-11-30 15:36:10 +00:00
const bool compute_granularity;
2018-12-21 12:17:30 +00:00
CompressionCodecPtr codec;
};
2017-04-16 15:00:33 +00:00
/** To write one part.
* The data refers to one partition, and is written in one part.
*/
class MergedBlockOutputStream final : public IMergedBlockOutputStream
{
public:
MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
2019-03-18 12:02:33 +00:00
CompressionCodecPtr default_codec_,
2019-03-25 13:55:24 +00:00
bool blocks_are_granules_size_ = false);
MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
2018-12-21 12:17:30 +00:00
CompressionCodecPtr default_codec_,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
2018-11-30 15:36:10 +00:00
size_t aio_threshold_,
2019-03-25 13:55:24 +00:00
bool blocks_are_granules_size_ = false);
std::string getPartPath() const;
2016-01-28 16:06:57 +00:00
Block getHeader() const override { return storage.getSampleBlock(); }
2017-04-16 15:00:33 +00:00
/// If the data is pre-sorted.
void write(const Block & block) override;
/** If the data is not sorted, but we have previously calculated the permutation, that will sort it.
2017-04-16 15:00:33 +00:00
* This method is used to save RAM, since you do not need to keep two blocks at once - the original one and the sorted one.
*/
void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation);
2014-03-27 17:30:04 +00:00
void writeSuffix() override;
2018-11-15 14:06:54 +00:00
/// Finilize writing part and fill inner structures
void writeSuffixAndFinalizePart(
MergeTreeData::MutableDataPartPtr & new_part,
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
const MergeTreeIndexGranularity & getIndexGranularity() const
2018-11-30 15:36:10 +00:00
{
return index_granularity;
}
private:
void init();
/** If `permutation` is given, it rearranges the values in the columns when writing.
2017-04-16 15:00:33 +00:00
* This is necessary to not keep the whole block in the RAM to sort it.
*/
void writeImpl(const Block & block, const IColumn::Permutation * permutation);
private:
NamesAndTypesList columns_list;
SerializationStates serialization_states;
String part_path;
2014-03-13 12:48:07 +00:00
size_t rows_count = 0;
std::unique_ptr<WriteBufferFromFile> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
MutableColumns index_columns;
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
std::vector<std::unique_ptr<ColumnStream>> skip_indices_streams;
2019-03-08 19:52:21 +00:00
MergeTreeIndexAggregators skip_indices_aggregators;
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
std::vector<size_t> skip_index_filling;
2013-04-24 10:31:32 +00:00
};
/// Writes only those columns that are in `header`
class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream
{
public:
/// skip_offsets: used when ALTERing columns if we know that array offsets are not altered.
2018-10-16 21:25:45 +00:00
/// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream
/// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
2018-12-21 12:17:30 +00:00
CompressionCodecPtr default_codec_, bool skip_offsets_,
2018-11-30 15:36:10 +00:00
WrittenOffsetColumns & already_written_offset_columns,
const MergeTreeIndexGranularity & index_granularity_);
2014-03-27 17:30:04 +00:00
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writeSuffix() override;
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums();
private:
Block header;
SerializationStates serialization_states;
String part_path;
bool initialized = false;
bool sync;
bool skip_offsets;
/// To correctly write Nested elements column-by-column.
WrittenOffsetColumns & already_written_offset_columns;
};
2013-09-26 19:16:43 +00:00
}