ClickHouse/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h

163 lines
5.0 KiB
C++
Raw Normal View History

2013-04-24 10:31:32 +00:00
#pragma once
2013-09-15 01:10:16 +00:00
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
2014-03-27 17:30:04 +00:00
#include <DB/IO/HashingWriteBuffer.h>
2014-03-09 17:36:01 +00:00
#include <DB/Storages/MergeTree/MergeTreeData.h>
2015-04-16 06:12:35 +00:00
#include <DB/DataStreams/IBlockOutputStream.h>
2013-04-24 10:31:32 +00:00
2013-09-15 01:10:16 +00:00
2013-04-24 10:31:32 +00:00
namespace DB
{
class IMergedBlockOutputStream : public IBlockOutputStream
2013-04-24 10:31:32 +00:00
{
public:
IMergedBlockOutputStream(
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionMethod compression_method_,
size_t aio_threshold_);
2013-08-24 08:01:19 +00:00
protected:
using OffsetColumns = std::set<std::string>;
2013-04-24 10:31:32 +00:00
struct ColumnStream
{
ColumnStream(
const String & escaped_column_name_,
const String & data_path,
const std::string & data_file_extension_,
const std::string & marks_path,
const std::string & marks_file_extension_,
size_t max_compress_block_size,
CompressionMethod compression_method,
size_t estimated_size,
size_t aio_threshold);
2014-03-27 17:30:04 +00:00
String escaped_column_name;
std::string data_file_extension;
std::string marks_file_extension;
2014-04-14 13:08:26 +00:00
/// compressed -> compressed_buf -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
2014-04-14 13:08:26 +00:00
HashingWriteBuffer plain_hashing;
2014-03-27 17:30:04 +00:00
CompressedWriteBuffer compressed_buf;
2014-04-14 13:13:20 +00:00
HashingWriteBuffer compressed;
2014-04-14 13:08:26 +00:00
/// marks -> marks_file
WriteBufferFromFile marks_file;
2014-03-27 17:30:04 +00:00
HashingWriteBuffer marks;
2013-09-15 01:10:16 +00:00
void finalize();
void sync();
void addToChecksums(MergeTreeData::DataPart::Checksums & checksums, String name = "");
2013-04-24 10:31:32 +00:00
};
using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
void addStream(const String & path, const String & name, const IDataType & type, size_t estimated_size = 0, size_t level = 0, String filename = "");
void addNullStream(const String & path, const String & name, size_t estimated_size, String filename);
2013-04-24 10:31:32 +00:00
/// Записать данные одного столбца.
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, size_t level = 0);
2014-03-09 17:36:01 +00:00
MergeTreeData & storage;
ColumnStreams column_streams;
ColumnStreams null_streams;
/// Смещение до первой строчки блока, для которой надо записать индекс.
size_t index_offset = 0;
2014-04-08 15:29:12 +00:00
size_t min_compress_block_size;
size_t max_compress_block_size;
size_t aio_threshold;
CompressionMethod compression_method;
};
/** Для записи одного куска.
* Данные относятся к одному месяцу, и пишутся в один кускок.
*/
class MergedBlockOutputStream : public IMergedBlockOutputStream
{
public:
MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method);
MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
size_t aio_threshold_);
std::string getPartPath() const;
2016-01-28 16:06:57 +00:00
/// Если данные заранее отсортированы.
void write(const Block & block) override;
/** Если данные не отсортированы, но мы заранее вычислили перестановку, после которой они станут сортированными.
* Этот метод используется для экономии оперативки, так как не нужно держать одновременно два блока - исходный и отсортированный.
*/
void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation);
2014-03-27 17:30:04 +00:00
void writeSuffix() override;
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums();
MergeTreeData::DataPart::Index & getIndex();
/// Сколько засечек уже записано.
size_t marksCount();
private:
void init();
/** Если задана permutation, то переставляет значения в столбцах при записи.
* Это нужно, чтобы не держать целый блок в оперативке для его сортировки.
*/
void writeImpl(const Block & block, const IColumn::Permutation * permutation);
private:
2014-03-13 12:48:07 +00:00
NamesAndTypesList columns_list;
String part_path;
2014-03-13 12:48:07 +00:00
size_t marks_count = 0;
std::unique_ptr<WriteBufferFromFile> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
MergeTreeData::DataPart::Index index_columns;
2013-04-24 10:31:32 +00:00
};
/// Записывает только те, столбцы, что лежат в block
class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream
{
public:
MergedColumnOnlyOutputStream(MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method);
void write(const Block & block) override;
void writeSuffix() override;
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums();
private:
String part_path;
bool initialized = false;
2014-03-05 16:28:24 +00:00
bool sync;
};
2013-09-26 19:16:43 +00:00
}