ClickHouse/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h

128 lines
5.3 KiB
C++
Raw Normal View History

2013-11-26 11:55:11 +00:00
#pragma once
2015-04-16 06:12:35 +00:00
#include <DB/Storages/MarkCache.h>
2015-06-24 11:03:53 +00:00
#include <DB/Storages/MergeTree/MarkRange.h>
2014-03-09 17:36:01 +00:00
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Columns/ColumnNullable.h>
2013-11-26 11:55:11 +00:00
#include <DB/Core/NamesAndTypes.h>
#include <DB/IO/CachedCompressedReadBuffer.h>
#include <DB/IO/CompressedReadBufferFromFile.h>
2013-11-26 11:55:11 +00:00
namespace DB
{
2014-03-13 12:48:07 +00:00
namespace ErrorCodes
{
extern const int NOT_FOUND_EXPECTED_DATA_PART;
extern const int MEMORY_LIMIT_EXCEEDED;
}
2016-07-19 10:57:57 +00:00
class IDataType;
2013-11-26 11:55:11 +00:00
/** Умеет читать данные между парой засечек из одного куска. При чтении последовательных отрезков не делает лишних seek-ов.
* При чтении почти последовательных отрезков делает seek-и быстро, не выбрасывая содержимое буфера.
*/
class MergeTreeReader
{
public:
using ValueSizeMap = std::map<std::string, double>;
2015-09-16 17:49:08 +00:00
MergeTreeReader(const String & path, /// Путь к куску
const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
bool save_marks_in_cache,
2015-09-16 17:49:08 +00:00
MergeTreeData & storage, const MarkRanges & all_mark_ranges,
size_t aio_threshold, size_t max_read_buffer_size,
const ValueSizeMap & avg_value_size_hints = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback = ReadBufferFromFileBase::ProfileCallback{},
2016-07-19 10:57:57 +00:00
clockid_t clock_type = CLOCK_MONOTONIC_COARSE);
2014-07-23 15:24:45 +00:00
2016-07-19 10:57:57 +00:00
MergeTreeReader(const MergeTreeReader &) = delete;
MergeTreeReader & operator=(const MergeTreeReader &) = delete;
2013-11-26 11:55:11 +00:00
2016-07-19 10:57:57 +00:00
const ValueSizeMap & getAvgValueSizeHints() const;
2015-09-16 17:49:08 +00:00
2013-11-26 11:55:11 +00:00
/** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец.
* Не добавляет столбцы, для которых нет файлов. Чтобы их добавить, нужно вызвать fillMissingColumns.
2015-04-02 03:08:43 +00:00
* В блоке должно быть либо ни одного столбца из columns, либо все, для которых есть файлы.
*/
2016-07-19 10:57:57 +00:00
void readRange(size_t from_mark, size_t to_mark, Block & res);
2013-11-26 11:55:11 +00:00
2015-04-02 03:08:43 +00:00
/** Добавляет в блок недостающие столбцы из ordered_names, состоящие из значений по-умолчанию.
* Недостающие столбцы добавляются в позиции, такие же как в ordered_names.
* Если был добавлен хотя бы один столбец - то все столбцы в блоке переупорядочиваются как в ordered_names.
*/
2016-07-19 10:57:57 +00:00
void fillMissingColumns(Block & res, const Names & ordered_names, const bool always_reorder = false);
2014-12-04 15:50:48 +00:00
2015-04-02 03:08:43 +00:00
/** То же самое, но всегда переупорядочивает столбцы в блоке, как в ordered_names
* (даже если не было недостающих столбцов).
*/
2016-07-19 10:57:57 +00:00
void fillMissingColumnsAndReorder(Block & res, const Names & ordered_names);
2013-11-26 11:55:11 +00:00
private:
2016-07-19 10:57:57 +00:00
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type,
size_t level = 0);
void readData(const String & name, const IDataType & type, IColumn & column, size_t from_mark, size_t max_rows_to_read,
size_t level = 0, bool read_offsets = true);
void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder);
2013-11-26 11:55:11 +00:00
2016-07-19 10:57:57 +00:00
private:
class Stream
{
public:
2015-07-08 17:59:44 +00:00
Stream(
const String & path_prefix_, const String & extension_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache, bool save_marks_in_cache,
const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size,
2016-07-19 10:57:57 +00:00
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
2013-11-26 11:55:11 +00:00
2016-07-19 10:57:57 +00:00
Stream(Stream &&) = default;
Stream & operator=(Stream &&) = default;
2014-02-11 13:30:42 +00:00
void loadMarks(MarkCache * cache, bool save_in_cache, bool is_null_stream);
2013-11-26 11:55:11 +00:00
2016-07-19 10:57:57 +00:00
void seekToMark(size_t index);
2013-11-26 11:55:11 +00:00
2016-07-19 10:57:57 +00:00
public:
ReadBuffer * data_buffer;
2016-05-04 18:04:36 +00:00
2016-07-19 10:57:57 +00:00
private:
MarkCache::MappedPtr marks;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
std::string path_prefix;
std::string extension;
2013-11-26 11:55:11 +00:00
};
2016-08-05 02:40:45 +00:00
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
2013-11-26 11:55:11 +00:00
2016-07-19 10:57:57 +00:00
private:
2015-09-03 09:17:25 +00:00
/// Используется в качестве подсказки, чтобы уменьшить количество реаллокаций при создании столбца переменной длины.
2015-09-16 17:49:08 +00:00
ValueSizeMap avg_value_size_hints;
2013-11-26 11:55:11 +00:00
String path;
2015-07-08 17:59:44 +00:00
MergeTreeData::DataPartPtr data_part;
2013-11-26 11:55:11 +00:00
FileStreams streams;
2015-07-03 15:08:21 +00:00
/// Запрашиваемые столбцы.
2014-07-17 13:41:47 +00:00
NamesAndTypesList columns;
2015-04-16 06:12:35 +00:00
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
/// Если выставлено в false - при отсутствии засечек в кэше, считавать засечки, но не сохранять их в кэш, чтобы не вымывать оттуда другие данные.
bool save_marks_in_cache;
2015-04-16 06:12:35 +00:00
2014-03-09 17:36:01 +00:00
MergeTreeData & storage;
2015-07-08 17:59:44 +00:00
MarkRanges all_mark_ranges;
size_t aio_threshold;
2015-04-12 04:39:20 +00:00
size_t max_read_buffer_size;
2013-11-26 11:55:11 +00:00
};
}