ClickHouse/dbms/include/DB/Storages/MergeTree/MergeTreeData.h

501 lines
23 KiB
C
Raw Normal View History

2014-03-09 17:36:01 +00:00
#pragma once
#include <statdaemons/Increment.h>
#include <DB/Core/SortDescription.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ExpressionActions.h>
#include <DB/Storages/IStorage.h>
#include <DB/Storages/MergeTree/ActiveDataPartSet.h>
2014-04-02 07:59:43 +00:00
#include <DB/IO/ReadBufferFromString.h>
2014-04-02 13:45:39 +00:00
#include <DB/Common/escapeForFileName.h>
2014-03-09 17:36:01 +00:00
#include <Poco/RWLock.h>
2014-04-02 13:45:39 +00:00
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
2014-03-09 17:36:01 +00:00
namespace DB
{
/** Структура данных для *MergeTree движков.
* Используется merge tree для инкрементальной сортировки данных.
* Таблица представлена набором сортированных кусков.
* При вставке, данные сортируются по указанному выражению (первичному ключу) и пишутся в новый кусок.
* Куски объединяются в фоне, согласно некоторой эвристике.
* Для каждого куска, создаётся индексный файл, содержащий значение первичного ключа для каждой n-ой строки.
* Таким образом, реализуется эффективная выборка по диапазону первичного ключа.
*
* Дополнительно:
*
* Указывается столбец, содержащий дату.
* Для каждого куска пишется минимальная и максимальная дата.
* (по сути - ещё один индекс)
*
* Данные разделяются по разным месяцам (пишутся в разные куски для разных месяцев).
* Куски для разных месяцев не объединяются - для простоты эксплуатации.
* (дают локальность обновлений, что удобно для синхронизации и бэкапа)
*
* Структура файлов:
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
* Внутри директории с куском:
* checksums.txt - список файлов с их размерами и контрольными суммами.
2014-03-09 17:36:01 +00:00
* primary.idx - индексный файл.
* Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
*
* Имеется несколько режимов работы, определяющих, что делать при мердже:
* - Ordinary - ничего дополнительно не делать;
* - Collapsing - при склейке кусков "схлопывать"
* пары записей с разными значениями sign_column для одного значения первичного ключа.
* (см. CollapsingSortedBlockInputStream.h)
* - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK.
*/
2014-03-13 12:48:07 +00:00
/** Этот класс хранит список кусков и параметры структуры данных.
* Для чтения и изменения данных используются отдельные классы:
2014-03-13 19:36:28 +00:00
* - MergeTreeDataSelectExecutor
2014-03-13 12:48:07 +00:00
* - MergeTreeDataWriter
* - MergeTreeDataMerger
2014-03-09 17:36:01 +00:00
*/
struct MergeTreeSettings
{
2014-04-10 12:30:59 +00:00
/// Опеределяет, насколько разбалансированные объединения мы готовы делать.
/// Чем больше, тем более разбалансированные. Желательно, чтобы было больше, чем 1/max_parts_to_merge_at_once.
double size_ratio_coefficient_to_merge_parts = 0.25;
2014-03-09 17:36:01 +00:00
/// Сколько за раз сливать кусков.
2014-04-10 12:30:59 +00:00
/// Трудоемкость выбора кусков O(N * max_parts_to_merge_at_once).
2014-03-09 17:36:01 +00:00
size_t max_parts_to_merge_at_once = 10;
/// Куски настолько большого размера в основном потоке объединять нельзя вообще.
size_t max_rows_to_merge_parts = 100 * 1024 * 1024;
/// Куски настолько большого размера во втором потоке объединять нельзя вообще.
size_t max_rows_to_merge_parts_second = 1024 * 1024;
/// Во столько раз ночью увеличиваем коэффициент.
size_t merge_parts_at_night_inc = 10;
2014-04-11 13:05:17 +00:00
/// Сколько потоков использовать для объединения кусков (для MergeTree).
/// Пул потоков общий на весь сервер.
size_t merging_threads = 6;
2014-03-09 17:36:01 +00:00
2014-04-03 11:48:28 +00:00
/// Сколько потоков использовать для загрузки кусков с других реплик и объединения кусков (для ReplicatedMergeTree).
2014-04-11 13:05:17 +00:00
/// Пул потоков на каждую таблицу свой.
2014-04-03 11:48:28 +00:00
size_t replication_threads = 4;
2014-03-09 17:36:01 +00:00
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
size_t min_rows_for_concurrent_read = 20 * 8192;
/// Можно пропускать чтение более чем стольки строк ценой одного seek по файлу.
size_t min_rows_for_seek = 5 * 8192;
/// Если отрезок индекса может содержать нужные ключи, делим его на столько частей и рекурсивно проверяем их.
size_t coarse_index_granularity = 8;
2014-04-03 11:48:28 +00:00
/// Максимальное количество строк на запрос, для использования кэша разжатых данных. Если запрос большой - кэш не используется.
/// (Чтобы большие запросы не вымывали кэш.)
2014-03-09 17:36:01 +00:00
size_t max_rows_to_use_cache = 1024 * 1024;
2014-04-09 15:52:47 +00:00
/// Через сколько секунд удалять ненужные куски.
2014-03-09 17:36:01 +00:00
time_t old_parts_lifetime = 5 * 60;
2014-04-11 16:56:49 +00:00
/// Если в таблице хотя бы столько активных кусков, искусственно замедлять вставки в таблицу.
size_t parts_to_delay_insert = 150;
/// Если в таблице parts_to_delay_insert + k кусков, спать insert_delay_step^k миллисекунд перед вставкой каждого блока.
/// Таким образом, скорость вставок автоматически замедлится примерно до скорости слияний.
double insert_delay_step = 1.1;
2014-04-14 10:19:33 +00:00
/// Для скольки блоков, вставленных с непустым insert ID, хранить хеши в ZooKeeper.
size_t replicated_deduplication_window = 10000;
2014-03-09 17:36:01 +00:00
};
class MergeTreeData : public ITableDeclaration
2014-03-09 17:36:01 +00:00
{
public:
/// Описание куска с данными.
struct DataPart : public ActiveDataPartSet::Part
2014-03-09 17:36:01 +00:00
{
/** Контрольные суммы всех не временных файлов.
* Для сжатых файлов хранятся чексумма и размер разжатых данных, чтобы не зависеть от способа сжатия.
*/
struct Checksums
{
struct Checksum
{
2014-04-14 13:08:26 +00:00
size_t file_size;
uint128 file_hash;
bool is_compressed = false;
size_t uncompressed_size;
uint128 uncompressed_hash;
void checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const;
void checkSize(const String & path) const;
};
typedef std::map<String, Checksum> FileChecksums;
2014-03-27 17:30:04 +00:00
FileChecksums files;
/// Проверяет, что множество столбцов и их контрольные суммы совпадают. Если нет - бросает исключение.
2014-04-14 13:08:26 +00:00
/// Если have_uncompressed, для сжатых файлов сравнивает чексуммы разжатых данных. Иначе сравнивает только чексуммы файлов.
void checkEqual(const Checksums & rhs, bool have_uncompressed) const;
/// Проверяет, что в директории есть все нужные файлы правильных размеров. Не проверяет чексуммы.
void checkSizes(const String & path) const;
/// Сериализует и десериализует в человекочитаемом виде.
bool readText(ReadBuffer & in); /// Возвращает false, если чексуммы в слишком старом формате.
void writeText(WriteBuffer & out) const;
2014-03-27 17:30:04 +00:00
bool empty() const
{
return files.empty();
}
2014-04-02 07:59:43 +00:00
2014-04-25 12:43:10 +00:00
/// Контрольная сумма от множества контрольных сумм .bin файлов.
String summaryDataChecksum() const
{
SipHash hash;
/// Пользуемся тем, что итерирование в детерминированном (лексикографическом) порядке.
for (const auto & it : files)
{
const String & name = it.first;
const Checksum & sum = it.second;
if (name.size() < strlen(".bin") || name.substr(name.size() - 4) != ".bin")
continue;
size_t len = name.size();
hash.update(reinterpret_cast<const char *>(&len), sizeof(len));
hash.update(name.data(), len);
hash.update(reinterpret_cast<const char *>(&sum.uncompressed_size), sizeof(sum.uncompressed_size));
hash.update(reinterpret_cast<const char *>(&sum.uncompressed_hash), sizeof(sum.uncompressed_hash));
}
UInt64 lo, hi;
hash.get128(lo, hi);
return DB::toString(lo) + "_" + DB::toString(hi);
}
2014-04-02 10:10:37 +00:00
String toString() const
{
String s;
{
WriteBufferFromString out(s);
writeText(out);
}
return s;
}
2014-04-02 07:59:43 +00:00
static Checksums parse(const String & s)
{
ReadBufferFromString in(s);
Checksums res;
if (!res.readText(in))
throw Exception("Checksums format is too old", ErrorCodes::FORMAT_VERSION_TOO_OLD);
2014-04-02 07:59:43 +00:00
assertEOF(in);
return res;
}
};
2014-04-09 15:52:47 +00:00
DataPart(MergeTreeData & storage_) : storage(storage_), size(0), size_in_bytes(0), remove_time(0) {}
2014-03-13 17:44:00 +00:00
2014-03-14 17:19:38 +00:00
MergeTreeData & storage;
2014-03-09 17:36:01 +00:00
size_t size; /// в количестве засечек.
size_t size_in_bytes; /// размер в байтах, 0 - если не посчитано
time_t modification_time;
2014-04-09 15:52:47 +00:00
mutable time_t remove_time; /// Когда кусок убрали из рабочего набора.
2014-03-09 17:36:01 +00:00
/// Первичный ключ. Всегда загружается в оперативку.
typedef std::vector<Field> Index;
Index index;
Checksums checksums;
2014-03-09 17:36:01 +00:00
/// NOTE можно загружать засечки тоже в оперативку
/// Вычисляем сумарный размер всей директории со всеми файлами
static size_t calcTotalSize(const String &from)
{
Poco::File cur(from);
if (cur.isFile())
return cur.getSize();
std::vector<std::string> files;
cur.list(files);
size_t res = 0;
for (size_t i = 0; i < files.size(); ++i)
res += calcTotalSize(from + files[i]);
return res;
}
2014-04-09 15:52:47 +00:00
void remove() const
2014-03-09 17:36:01 +00:00
{
String from = storage.full_path + name + "/";
String to = storage.full_path + "tmp2_" + name + "/";
Poco::File(from).renameTo(to);
Poco::File(to).remove(true);
}
2014-04-02 07:59:43 +00:00
/// Переименовывает кусок, дописав к имени префикс.
void renameAddPrefix(const String & prefix) const
2014-03-09 17:36:01 +00:00
{
String from = storage.full_path + name + "/";
2014-04-02 07:59:43 +00:00
String to = storage.full_path + prefix + name + "/";
2014-03-09 17:36:01 +00:00
Poco::File f(from);
f.setLastModified(Poco::Timestamp::fromEpochTime(time(0)));
f.renameTo(to);
}
2014-04-02 13:45:39 +00:00
/// Загрузить индекс и вычислить размер. Если size=0, вычислить его тоже.
2014-03-09 17:36:01 +00:00
void loadIndex()
{
2014-04-02 13:45:39 +00:00
/// Размер - в количестве засечек.
if (!size)
size = Poco::File(storage.full_path + name + "/" + escapeForFileName(storage.columns->front().first) + ".mrk")
.getSize() / MERGE_TREE_MARK_SIZE;
2014-03-09 17:36:01 +00:00
size_t key_size = storage.sort_descr.size();
index.resize(key_size * size);
String index_path = storage.full_path + name + "/primary.idx";
2014-04-02 13:45:39 +00:00
ReadBufferFromFile index_file(index_path,
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
2014-03-09 17:36:01 +00:00
for (size_t i = 0; i < size; ++i)
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i * key_size + j], index_file);
if (!index_file.eof())
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
size_in_bytes = calcTotalSize(storage.full_path + name + "/");
}
/// Прочитать контрольные суммы, если есть.
bool loadChecksums()
{
String path = storage.full_path + name + "/checksums.txt";
if (!Poco::File(path).exists())
return false;
ReadBufferFromFile file(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
2014-04-18 10:37:26 +00:00
if (checksums.readText(file))
assertEOF(file);
return true;
}
2014-04-18 11:05:30 +00:00
void checkNotBroken()
{
String path = storage.full_path + name;
if (!checksums.empty())
{
checksums.checkSizes(path + "/");
}
else
{
/// Проверяем, что первичный ключ непуст.
Poco::File index_file(path + "/primary.idx");
if (!index_file.exists() || index_file.getSize() == 0)
throw Exception("Part " + path + " is broken: primary key is empty.", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
/// Проверяем, что все засечки непусты и имеют одинаковый размер.
ssize_t marks_size = -1;
for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
{
Poco::File marks_file(path + "/" + escapeForFileName(it->first) + ".mrk");
/// При добавлении нового столбца в таблицу файлы .mrk не создаются. Не будем ничего удалять.
if (!marks_file.exists())
continue;
if (marks_size == -1)
{
marks_size = marks_file.getSize();
if (0 == marks_size)
throw Exception("Part " + path + " is broken: " + marks_file.path() + " is empty.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
else
{
if (static_cast<ssize_t>(marks_file.getSize()) != marks_size)
throw Exception("Part " + path + " is broken: marks have different sizes.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
}
}
}
2014-03-09 17:36:01 +00:00
};
2014-03-14 17:19:38 +00:00
typedef std::shared_ptr<DataPart> MutableDataPartPtr;
/// После добавление в рабочее множество DataPart нельзя изменять.
typedef std::shared_ptr<const DataPart> DataPartPtr;
2014-03-09 17:36:01 +00:00
struct DataPartPtrLess { bool operator() (const DataPartPtr & lhs, const DataPartPtr & rhs) const { return *lhs < *rhs; } };
typedef std::set<DataPartPtr, DataPartPtrLess> DataParts;
2014-03-13 12:48:07 +00:00
typedef std::vector<DataPartPtr> DataPartsVector;
2014-03-09 17:36:01 +00:00
2014-03-13 12:48:07 +00:00
/// Режим работы. См. выше.
enum Mode
2014-03-09 17:36:01 +00:00
{
2014-03-13 12:48:07 +00:00
Ordinary,
Collapsing,
Summing,
2014-05-26 16:11:20 +00:00
Aggregating,
2014-03-13 12:48:07 +00:00
};
/** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце),
* (корректность имён и путей не проверяется)
* состоящую из указанных столбцов.
*
* primary_expr_ast - выражение для сортировки;
* date_column_name - имя столбца с датой;
* index_granularity - на сколько строчек пишется одно значение индекса.
*/
MergeTreeData( const String & full_path_, NamesAndTypesListPtr columns_,
const Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
2014-04-08 07:58:53 +00:00
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
2014-03-13 12:48:07 +00:00
size_t index_granularity_,
Mode mode_,
const String & sign_column_,
2014-05-08 07:12:01 +00:00
const MergeTreeSettings & settings_,
const String & log_name_);
2014-03-09 17:36:01 +00:00
2014-03-13 12:48:07 +00:00
std::string getModePrefix() const;
std::string getSignColumnName() const { return sign_column; }
bool supportsSampling() const { return !!sampling_expression; }
bool supportsFinal() const { return !sign_column.empty(); }
bool supportsPrewhere() const { return true; }
UInt64 getMaxDataPartIndex();
2014-05-22 10:37:17 +00:00
std::string getTableName() const { throw Exception("Logical error: calling method getTableName of not a table.",
ErrorCodes::LOGICAL_ERROR); }
2014-03-13 12:48:07 +00:00
const NamesAndTypesList & getColumnsList() const { return *columns; }
2014-03-13 12:48:07 +00:00
String getFullPath() const { return full_path; }
2014-03-13 12:48:07 +00:00
2014-05-08 07:12:01 +00:00
String getLogName() const { return log_name; }
2014-03-13 17:44:00 +00:00
/** Возвращает копию списка, чтобы снаружи можно было не заботиться о блокировках.
*/
DataParts getDataParts();
2014-04-09 16:32:32 +00:00
DataParts getAllDataParts();
2014-05-16 15:55:57 +00:00
/** Максимальное количество кусков в одном месяце.
*/
size_t getMaxPartsCountForMonth();
2014-03-13 17:44:00 +00:00
2014-04-03 11:48:28 +00:00
/** Возвращает кусок с указанным именем или кусок, покрывающий его. Если такого нет, возвращает nullptr.
2014-04-09 15:52:47 +00:00
* Если including_inactive, просматриваются также неактивные куски (all_data_parts).
* При including_inactive, нахождение куска гарантируется только если есть кусок, совпадающий с part_name;
* строго покрывающий кусок в некоторых случаях может не найтись.
2014-03-13 17:44:00 +00:00
*/
2014-04-09 15:52:47 +00:00
DataPartPtr getContainingPart(const String & part_name, bool including_inactive = false);
2014-03-13 17:44:00 +00:00
/** Переименовывает временный кусок в постоянный и добавляет его в рабочий набор.
2014-03-14 17:19:38 +00:00
* Если increment!=nullptr, индекс куска берется из инкремента. Иначе индекс куска не меняется.
2014-04-07 15:45:46 +00:00
* Предполагается, что кусок не пересекается с существующими.
*/
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment = nullptr);
/** То же, что renameTempPartAndAdd, но кусок может покрывать существующие куски.
* Удаляет и возвращает все куски, покрытые добавляемым (в возрастающем порядке).
2014-03-13 17:44:00 +00:00
*/
2014-04-07 15:45:46 +00:00
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr);
2014-03-13 17:44:00 +00:00
2014-04-02 07:59:43 +00:00
/** Переименовывает кусок в prefix_кусок и убирает его из рабочего набора.
* Лучше использовать только когда никто не может читать или писать этот кусок
* (например, при инициализации таблицы).
*/
2014-04-02 10:10:37 +00:00
void renameAndDetachPart(DataPartPtr part, const String & prefix);
/** Удаляет кусок из рабочего набора. clearOldParts удалит его файлы, если на него никто не ссылается.
2014-03-13 17:44:00 +00:00
*/
2014-04-02 10:10:37 +00:00
void removePart(DataPartPtr part);
2014-03-13 17:44:00 +00:00
2014-04-09 15:52:47 +00:00
/** Удалить неактуальные куски. Возвращает имена удаленных кусков.
2014-03-13 17:44:00 +00:00
*/
2014-04-09 15:52:47 +00:00
Strings clearOldParts();
2014-03-13 17:44:00 +00:00
/** После вызова dropAllData больше ничего вызывать нельзя.
2014-03-13 19:14:25 +00:00
* Удаляет директорию с данными и сбрасывает кеши разжатых блоков и засечек.
2014-03-13 17:44:00 +00:00
*/
void dropAllData();
2014-03-13 19:14:25 +00:00
/** Перемещает всю директорию с данными.
2014-03-13 19:07:17 +00:00
* Сбрасывает кеши разжатых блоков и засечек.
2014-03-13 17:44:00 +00:00
* Нужно вызывать под залоченным lockStructure().
*/
void setPath(const String & full_path);
void alter(const ASTAlterQuery::Parameters & params);
2014-03-20 13:00:42 +00:00
void prepareAlterModify(const ASTAlterQuery::Parameters & params);
void commitAlterModify(const ASTAlterQuery::Parameters & params);
2014-03-14 17:03:52 +00:00
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
SortDescription getSortDescription() const { return sort_descr; }
2014-03-13 12:48:07 +00:00
const Context & context;
2014-03-14 17:03:52 +00:00
const String date_column_name;
const ASTPtr sampling_expression;
const size_t index_granularity;
2014-03-13 12:48:07 +00:00
/// Режим работы - какие дополнительные действия делать при мердже.
2014-03-14 17:03:52 +00:00
const Mode mode;
2014-03-13 12:48:07 +00:00
/// Для схлопывания записей об изменениях, если используется Collapsing режим работы.
2014-03-14 17:03:52 +00:00
const String sign_column;
2014-03-13 12:48:07 +00:00
2014-03-14 17:03:52 +00:00
const MergeTreeSettings settings;
2014-03-13 12:48:07 +00:00
2014-03-22 14:44:44 +00:00
const ASTPtr primary_expr_ast;
2014-03-14 17:03:52 +00:00
private:
2014-03-13 12:48:07 +00:00
ExpressionActionsPtr primary_expr;
SortDescription sort_descr;
Block primary_key_sample;
String full_path;
2014-03-09 17:36:01 +00:00
2014-03-13 12:48:07 +00:00
NamesAndTypesListPtr columns;
2014-03-09 17:36:01 +00:00
2014-05-08 07:12:01 +00:00
String log_name;
2014-03-13 12:48:07 +00:00
Logger * log;
2014-03-09 17:36:01 +00:00
/** Актуальное множество кусков с данными. */
DataParts data_parts;
Poco::FastMutex data_parts_mutex;
/** Множество всех кусков с данными, включая уже слитые в более крупные, но ещё не удалённые. Оно обычно небольшое (десятки элементов).
2014-03-13 12:48:07 +00:00
* Ссылки на кусок есть отсюда, из списка актуальных кусков и из каждого потока чтения, который его сейчас использует.
2014-03-09 17:36:01 +00:00
* То есть, если количество ссылок равно 1 - то кусок не актуален и не используется прямо сейчас, и его можно удалить.
*/
DataParts all_data_parts;
Poco::FastMutex all_data_parts_mutex;
/// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта.
void loadDataParts();
void removeColumnFiles(String column_name, bool remove_array_size_files);
2014-03-09 17:36:01 +00:00
/// Определить, не битые ли данные в директории. Проверяет индекс и засечеки, но не сами данные.
bool isBrokenPart(const String & path);
void createConvertExpression(const String & in_column_name, const String & out_type, ExpressionActionsPtr & out_expression, String & out_column);
};
}