ClickHouse/dbms/include/DB/Storages/StorageMergeTree.h

329 lines
14 KiB
C
Raw Normal View History

2012-07-16 20:25:19 +00:00
#pragma once
2012-07-17 20:04:39 +00:00
#include <statdaemons/Increment.h>
2012-11-28 08:52:15 +00:00
#include <statdaemons/threadpool.hpp>
2012-07-17 20:04:39 +00:00
#include <DB/Core/SortDescription.h>
2012-07-16 20:25:19 +00:00
#include <DB/Interpreters/Context.h>
2012-07-17 20:04:39 +00:00
#include <DB/Interpreters/Expression.h>
2012-07-16 20:25:19 +00:00
#include <DB/Storages/IStorage.h>
namespace DB
{
/** Движок, использующий merge tree для инкрементальной сортировки данных.
* Таблица представлена набором сортированных кусков.
* При вставке, данные сортируются по указанному выражению (первичному ключу) и пишутся в новый кусок.
* Куски объединяются в фоне, согласно некоторой эвристике.
* Для каждого куска, создаётся индексный файл, содержащий значение первичного ключа для каждой n-ой строки.
* Таким образом, реализуется эффективная выборка по диапазону первичного ключа.
*
* Дополнительно:
*
* Указывается столбец, содержащий дату.
* Для каждого куска пишется минимальная и максимальная дата.
* (по сути - ещё один индекс)
*
* Данные разделяются по разным месяцам (пишутся в разные куски для разных месяцев).
* Куски для разных месяцев не объединяются - для простоты эксплуатации.
* (дают локальность обновлений, что удобно для синхронизации и бэкапа)
*
* Структура файлов:
* / increment.txt - файл, содержащий одно число, увеличивающееся на 1 - для генерации идентификаторов кусков.
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
2012-07-18 19:16:16 +00:00
* / min-date _ max-date _ min-id _ max-id _ level / primary.idx - индексный файл.
2012-07-16 20:25:19 +00:00
* Внутри директории с куском:
* Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
2012-08-16 17:27:40 +00:00
*
* Если указано sign_column, то при склейке кусков, также "схлопываются"
* пары записей с разными значениями sign_column для одного значения первичного ключа.
2012-08-16 17:27:40 +00:00
* (см. CollapsingSortedBlockInputStream.h)
2012-07-16 20:25:19 +00:00
*/
2012-08-29 20:23:19 +00:00
struct StorageMergeTreeSettings
{
2012-11-29 10:50:17 +00:00
/// Набор кусков разрешено объединить, если среди них максимальный размер не более чем во столько раз больше суммы остальных.
double max_size_ratio_to_merge_parts;
/// Сколько за раз сливать кусков.
/// Трудоемкость выбора кусков O(N * max_parts_to_merge_at_once), так что не следует делать это число слишком большим.
/// С другой стороны, чтобы слияния точно не могли зайти в тупик, нужно хотя бы
/// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts).
size_t max_parts_to_merge_at_once;
2012-12-06 09:45:09 +00:00
2012-08-29 20:23:19 +00:00
/// Куски настолько большого размера объединять нельзя вообще.
size_t max_rows_to_merge_parts;
2012-11-28 08:52:15 +00:00
/// Сколько потоков использовать для объединения кусков.
size_t merging_threads;
2012-08-29 20:23:19 +00:00
2012-11-29 10:50:17 +00:00
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
size_t min_rows_for_concurrent_read;
2012-12-06 09:45:09 +00:00
/// Можно пропускать чтение более чем стольки строк ценой одного seek по файлу.
size_t min_rows_for_seek;
2012-12-10 10:23:10 +00:00
/// Если отрезок индекса может содержать нужные ключи, делим его на столько частей и рекурсивно проверяем их.
size_t coarse_index_granularity;
/// Если кусок содержит хотя бы столько строк, его содержимое можно не коллапсировать с содержимым остальных кусков при SELECT ... FINAL.
size_t min_rows_to_skip_collapsing;
2012-08-29 20:23:19 +00:00
StorageMergeTreeSettings() :
2012-11-29 10:50:17 +00:00
max_size_ratio_to_merge_parts(5),
max_parts_to_merge_at_once(10),
2012-11-28 08:52:15 +00:00
max_rows_to_merge_parts(100 * 1024 * 1024),
2012-11-29 10:50:17 +00:00
merging_threads(2),
2012-12-06 09:45:09 +00:00
min_rows_for_concurrent_read(20 * 8192),
2012-12-10 10:23:10 +00:00
min_rows_for_seek(5 * 8192),
coarse_index_granularity(8),
min_rows_to_skip_collapsing(90 * 1024 * 1024) {}
2012-12-06 09:45:09 +00:00
};
/// Пара засечек, определяющая диапазон строк в куске. Именно, диапазон имеет вид [begin * index_granularity, end * index_granularity).
struct MarkRange
{
2012-12-06 11:10:05 +00:00
size_t begin;
size_t end;
2012-12-06 09:45:09 +00:00
MarkRange() {}
2012-12-06 11:10:05 +00:00
MarkRange(size_t begin_, size_t end_) : begin(begin_), end(end_) {}
2012-08-29 20:23:19 +00:00
};
2012-12-06 09:45:09 +00:00
typedef std::vector<MarkRange> MarkRanges;
2012-08-29 20:23:19 +00:00
2012-07-16 20:25:19 +00:00
class StorageMergeTree : public IStorage
{
2012-07-21 06:47:17 +00:00
friend class MergeTreeBlockInputStream;
2012-07-17 20:04:39 +00:00
friend class MergeTreeBlockOutputStream;
2012-07-30 20:32:36 +00:00
friend class MergedBlockOutputStream;
2012-07-17 20:04:39 +00:00
2012-07-16 20:25:19 +00:00
public:
/** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце),
* (корректность имён и путей не проверяется)
* состоящую из указанных столбцов.
*
2012-11-30 00:52:45 +00:00
* primary_expr_ast - выражение для сортировки;
2012-07-16 20:25:19 +00:00
* date_column_name - имя столбца с датой;
* index_granularity - на сколько строчек пишется одно значение индекса.
*/
static StoragePtr create(const String & path_, const String & name_, NamesAndTypesListPtr columns_,
2012-07-17 20:04:39 +00:00
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
2012-12-12 15:45:08 +00:00
const ASTPtr & sampling_expression_, /// NULL, если семплирование не поддерживается.
2012-07-31 16:37:20 +00:00
size_t index_granularity_,
const String & sign_column_ = "",
2012-08-29 20:23:19 +00:00
const StorageMergeTreeSettings & settings_ = StorageMergeTreeSettings());
2012-07-16 20:25:19 +00:00
2012-07-30 20:32:36 +00:00
~StorageMergeTree();
2012-12-12 23:31:40 +00:00
std::string getName() const { return sign_column.empty() ? "MergeTree" : "CollapsingMergeTree"; }
2012-07-16 20:25:19 +00:00
std::string getTableName() const { return name; }
std::string getSignColumnName() const { return sign_column; }
2012-12-12 15:45:08 +00:00
bool supportsSampling() const { return !!sampling_expression; }
2013-04-23 11:08:41 +00:00
bool supportsFinal() const { return !sign_column.empty(); }
2012-07-16 20:25:19 +00:00
const NamesAndTypesList & getColumnsList() const { return *columns; }
/** При чтении, выбирается набор кусков, покрывающий нужный диапазон индекса.
*/
2012-07-21 03:45:48 +00:00
BlockInputStreams read(
2012-07-16 20:25:19 +00:00
const Names & column_names,
ASTPtr query,
const Settings & settings,
2012-07-16 20:25:19 +00:00
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
2012-07-21 03:45:48 +00:00
unsigned threads = 1);
2012-07-16 20:25:19 +00:00
/** При записи, данные сортируются и пишутся в новые куски.
*/
BlockOutputStreamPtr write(
ASTPtr query);
/** Выполнить очередной шаг объединения кусков.
*/
2012-07-31 16:37:20 +00:00
bool optimize()
{
2012-09-10 19:05:06 +00:00
merge(1, false);
return true;
2012-07-31 16:37:20 +00:00
}
2012-07-16 20:25:19 +00:00
void dropImpl();
2012-07-16 20:25:19 +00:00
2013-01-23 11:16:32 +00:00
void rename(const String & new_path_to_db, const String & new_name);
2012-07-16 20:25:19 +00:00
private:
String path;
String name;
2012-07-17 20:04:39 +00:00
String full_path;
2012-07-16 20:25:19 +00:00
NamesAndTypesListPtr columns;
2012-07-17 20:04:39 +00:00
Context context;
ASTPtr primary_expr_ast;
2012-07-16 20:25:19 +00:00
String date_column_name;
2012-12-12 15:45:08 +00:00
ASTPtr sampling_expression;
2012-07-16 20:25:19 +00:00
size_t index_granularity;
2012-12-06 13:07:29 +00:00
size_t min_marks_for_seek;
size_t min_marks_for_concurrent_read;
2012-08-16 17:27:40 +00:00
/// Для схлопывания записей об изменениях, если это требуется.
String sign_column;
2012-08-29 20:23:19 +00:00
StorageMergeTreeSettings settings;
2012-07-17 20:04:39 +00:00
2012-07-18 20:14:41 +00:00
SharedPtr<Expression> primary_expr;
2012-07-17 20:04:39 +00:00
SortDescription sort_descr;
2012-07-21 07:02:55 +00:00
Block primary_key_sample;
2012-07-17 20:04:39 +00:00
Increment increment;
2012-07-19 20:32:10 +00:00
Logger * log;
/// Описание куска с данными.
struct DataPart
{
2012-11-28 17:17:17 +00:00
DataPart(StorageMergeTree & storage_) : storage(storage_), currently_merging(false) {}
2012-07-31 20:03:53 +00:00
StorageMergeTree & storage;
2012-07-19 20:32:10 +00:00
Yandex::DayNum_t left_date;
Yandex::DayNum_t right_date;
UInt64 left;
UInt64 right;
2012-11-29 10:50:17 +00:00
/// Уровень игнорируется. Использовался предыдущей эвристикой слияния.
2012-07-19 20:32:10 +00:00
UInt32 level;
std::string name;
size_t size; /// в количестве засечек.
time_t modification_time;
Yandex::DayNum_t left_month;
Yandex::DayNum_t right_month;
2012-11-28 08:52:15 +00:00
/// Смотреть и изменять это поле следует под залоченным data_parts_mutex.
bool currently_merging;
2012-07-19 20:32:10 +00:00
2012-07-23 06:23:29 +00:00
/// NOTE можно загружать индекс и засечки в оперативку
2012-07-19 20:32:10 +00:00
void remove() const
{
2012-07-31 20:03:53 +00:00
String from = storage.full_path + name + "/";
String to = storage.full_path + "tmp2_" + name + "/";
Poco::File(from).renameTo(to);
Poco::File(to).remove(true);
2012-07-19 20:32:10 +00:00
}
bool operator< (const DataPart & rhs) const
{
if (left_month < rhs.left_month)
return true;
if (left_month > rhs.left_month)
return false;
if (right_month < rhs.right_month)
return true;
if (right_month > rhs.right_month)
return false;
if (left < rhs.left)
return true;
if (left > rhs.left)
return false;
if (right < rhs.right)
return true;
if (right > rhs.right)
return false;
if (level < rhs.level)
return true;
return false;
}
/// Содержит другой кусок (получен после объединения другого куска с каким-то ещё)
bool contains(const DataPart & rhs) const
{
return left_month == rhs.left_month /// Куски за разные месяцы не объединяются
&& right_month == rhs.right_month
&& level > rhs.level
&& left_date <= rhs.left_date
&& right_date >= rhs.right_date
&& left <= rhs.left
2012-11-29 16:52:01 +00:00
&& right >= rhs.right;
2012-07-19 20:32:10 +00:00
}
};
2012-07-23 06:23:29 +00:00
typedef SharedPtr<DataPart> DataPartPtr;
2012-08-30 17:43:31 +00:00
struct DataPartPtrLess { bool operator() (const DataPartPtr & lhs, const DataPartPtr & rhs) const { return *lhs < *rhs; } };
typedef std::set<DataPartPtr, DataPartPtrLess> DataParts;
2012-11-28 08:52:15 +00:00
2012-12-06 09:45:09 +00:00
struct RangesInDataPart
2012-11-28 08:52:15 +00:00
{
DataPartPtr data_part;
2012-12-06 09:45:09 +00:00
MarkRanges ranges;
2012-11-30 00:52:45 +00:00
2012-12-06 09:45:09 +00:00
RangesInDataPart() {}
2012-11-30 00:52:45 +00:00
2012-12-06 09:45:09 +00:00
RangesInDataPart(DataPartPtr data_part_)
: data_part(data_part_)
2012-11-28 08:52:15 +00:00
{
}
};
2012-12-06 09:45:09 +00:00
typedef std::vector<RangesInDataPart> RangesInDataParts;
2012-07-23 06:23:29 +00:00
/** Множество всех кусков с данными, включая уже слитые в более крупные, но ещё не удалённые. Оно обычно небольшое (десятки элементов).
* Ссылки на кусок есть отсюда, из списка актуальных кусков, и из каждого потока чтения, который его сейчас использует.
* То есть, если количество ссылок равно 1 - то кусок не актуален и не используется прямо сейчас, и его можно удалить.
*/
DataParts all_data_parts;
Poco::FastMutex all_data_parts_mutex;
/** Актуальное множество кусков с данными. */
2012-08-10 20:04:34 +00:00
DataParts data_parts;
Poco::FastMutex data_parts_mutex;
2012-07-19 20:32:10 +00:00
StorageMergeTree(const String & path_, const String & name_, NamesAndTypesListPtr columns_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// NULL, если семплирование не поддерживается.
size_t index_granularity_,
const String & sign_column_ = "",
const StorageMergeTreeSettings & settings_ = StorageMergeTreeSettings());
2012-07-19 20:32:10 +00:00
static String getPartName(Yandex::DayNum_t left_date, Yandex::DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level);
2012-12-06 09:45:09 +00:00
BlockInputStreams spreadMarkRangesAmongThreads(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size);
BlockInputStreams spreadMarkRangesAmongThreadsFinal(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size);
2012-12-06 09:45:09 +00:00
/// Создать выражение "Sign == 1".
void createPositiveSignCondition(ExpressionPtr & out_expression, String & out_column);
2012-08-10 20:04:34 +00:00
/// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта.
2012-07-19 20:32:10 +00:00
void loadDataParts();
2012-07-21 05:07:14 +00:00
2012-07-23 06:23:29 +00:00
/// Удалить неактуальные куски.
void clearOldParts();
2012-11-28 08:52:15 +00:00
/// Определяет, какие куски нужно объединять, и запускает их слияние в отдельном потоке. Если iterations=0, объединяет, пока это возможно.
2012-09-10 19:05:06 +00:00
void merge(size_t iterations = 1, bool async = true);
2012-11-28 08:52:15 +00:00
/// Если while_can, объединяет в цикле, пока можно; иначе выбирает и объединяет только одну пару кусков.
void mergeThread(bool while_can);
/// Сразу помечает их как currently_merging.
2013-02-14 11:22:56 +00:00
/// Если merge_anything_for_old_months, для кусков за прошедшие месяцы снимается ограничение на соотношение размеров.
bool selectPartsToMerge(std::vector<DataPartPtr> & parts, bool merge_anything_for_old_months);
2012-11-28 08:52:15 +00:00
void mergeParts(std::vector<DataPartPtr> parts);
/// Дождаться, пока фоновые потоки закончат слияния.
void joinMergeThreads();
2012-07-23 06:23:29 +00:00
2012-11-28 08:52:15 +00:00
Poco::SharedPtr<boost::threadpool::pool> merge_threads;
2012-07-16 20:25:19 +00:00
};
}