mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 17:50:47 +00:00
Merge
This commit is contained in:
parent
96fb6da011
commit
58ea3b108b
65
dbms/include/DB/Storages/IColumnsDeclaration.h
Normal file
65
dbms/include/DB/Storages/IColumnsDeclaration.h
Normal file
@ -0,0 +1,65 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/Core/Names.h>
|
||||
#include <DB/Core/NamesAndTypes.h>
|
||||
#include <DB/Core/Exception.h>
|
||||
#include <DB/Core/Block.h>
|
||||
#include <DB/Parsers/ASTAlterQuery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
/** Описание столбцов таблицы.
|
||||
*/
|
||||
class IColumnsDeclaration
|
||||
{
|
||||
public:
|
||||
/// Имя таблицы. Ни на что не влияет, используется только для сообщений об ошибках.
|
||||
virtual std::string getTableName() const { return ""; }
|
||||
|
||||
/** Получить список имён и типов столбцов таблицы, только невиртуальные.
|
||||
*/
|
||||
virtual const NamesAndTypesList & getColumnsList() const = 0;
|
||||
|
||||
/** Получить описание реального (невиртуального) столбца по его имени.
|
||||
*/
|
||||
virtual NameAndTypePair getRealColumn(const String & column_name) const;
|
||||
|
||||
/** Присутствует ли реальный (невиртуальный) столбец с таким именем.
|
||||
*/
|
||||
virtual bool hasRealColumn(const String & column_name) const;
|
||||
|
||||
/** Получить описание любого столбца по его имени.
|
||||
*/
|
||||
virtual NameAndTypePair getColumn(const String & column_name) const;
|
||||
|
||||
/** Присутствует ли столбец с таким именем.
|
||||
*/
|
||||
virtual bool hasColumn(const String & column_name) const;
|
||||
|
||||
const DataTypePtr getDataTypeByName(const String & column_name) const;
|
||||
|
||||
/** То же самое, но в виде блока-образца.
|
||||
*/
|
||||
Block getSampleBlock() const;
|
||||
|
||||
/** Проверить, что все запрошенные имена есть в таблице и заданы корректно.
|
||||
* (список имён не пустой и имена не повторяются)
|
||||
*/
|
||||
void check(const Names & column_names) const;
|
||||
|
||||
/** Проверить, что блок с данными для записи содержит все столбцы таблицы с правильными типами,
|
||||
* содержит только столбцы таблицы, и все столбцы различны.
|
||||
* Если need_all, еще проверяет, что все столбцы таблицы есть в блоке.
|
||||
*/
|
||||
void check(const Block & block, bool need_all = false) const;
|
||||
|
||||
/// реализация alter, модифицирующая список столбцов.
|
||||
static void alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context);
|
||||
|
||||
virtual ~IColumnsDeclaration() {}
|
||||
};
|
||||
|
||||
}
|
@ -13,7 +13,8 @@
|
||||
#include <DB/Parsers/ASTAlterQuery.h>
|
||||
#include <DB/Interpreters/Settings.h>
|
||||
#include <DB/Storages/StoragePtr.h>
|
||||
#include "DatabaseDropper.h"
|
||||
#include <DB/Storages/IColumnsDeclaration.h>
|
||||
#include <DB/Storages/DatabaseDropper.h>
|
||||
#include <Poco/File.h>
|
||||
|
||||
|
||||
@ -30,41 +31,15 @@ class Context;
|
||||
* - структура хранения данных (сжатие, etc.)
|
||||
* - конкуррентный доступ к данным (блокировки, etc.)
|
||||
*/
|
||||
class IStorage : private boost::noncopyable
|
||||
class IStorage : private boost::noncopyable, public IColumnsDeclaration
|
||||
{
|
||||
public:
|
||||
/// Основное имя типа таблицы (например, StorageWithoutKey).
|
||||
/// Основное имя типа таблицы (например, StorageMergeTree).
|
||||
virtual std::string getName() const = 0;
|
||||
|
||||
/// Имя самой таблицы (например, hits)
|
||||
virtual std::string getTableName() const = 0;
|
||||
|
||||
/** Получить список имён и типов столбцов таблицы, только невиртуальные.
|
||||
*/
|
||||
virtual const NamesAndTypesList & getColumnsList() const = 0;
|
||||
|
||||
/** Получить описание реального (невиртуального) столбца по его имени.
|
||||
*/
|
||||
virtual NameAndTypePair getRealColumn(const String & column_name) const;
|
||||
|
||||
/** Присутствует ли реальный (невиртуальный) столбец с таким именем.
|
||||
*/
|
||||
virtual bool hasRealColumn(const String & column_name) const;
|
||||
|
||||
/** Получить описание любого столбца по его имени.
|
||||
*/
|
||||
virtual NameAndTypePair getColumn(const String & column_name) const;
|
||||
|
||||
/** Присутствует ли столбец с таким именем.
|
||||
*/
|
||||
virtual bool hasColumn(const String & column_name) const;
|
||||
|
||||
const DataTypePtr getDataTypeByName(const String & column_name) const;
|
||||
|
||||
/** То же самое, но в виде блока-образца.
|
||||
*/
|
||||
Block getSampleBlock() const;
|
||||
|
||||
/** Возвращает true, если хранилище получает данные с удалённого сервера или серверов.
|
||||
*/
|
||||
virtual bool isRemote() const { return false; }
|
||||
@ -172,19 +147,6 @@ public:
|
||||
*/
|
||||
virtual void shutdown() {}
|
||||
|
||||
virtual ~IStorage() {}
|
||||
|
||||
/** Проверить, что все запрошенные имена есть в таблице и заданы корректно.
|
||||
* (список имён не пустой и имена не повторяются)
|
||||
*/
|
||||
void check(const Names & column_names) const;
|
||||
|
||||
/** Проверить, что блок с данными для записи содержит все столбцы таблицы с правильными типами,
|
||||
* содержит только столбцы таблицы, и все столбцы различны.
|
||||
* Если need_all, еще проверяет, что все столбцы таблицы есть в блоке.
|
||||
*/
|
||||
void check(const Block & block, bool need_all = false) const;
|
||||
|
||||
/** Возвращает владеющий указатель на себя.
|
||||
*/
|
||||
StoragePtr thisPtr()
|
||||
@ -213,9 +175,7 @@ public:
|
||||
|
||||
protected:
|
||||
IStorage() : drop_on_destroy(false) {}
|
||||
|
||||
/// реализация alter, модифицирующая список столбцов.
|
||||
void alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context) const;
|
||||
|
||||
private:
|
||||
boost::weak_ptr<StoragePtr::Wrapper> this_ptr;
|
||||
};
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DB/Storages/StorageMergeTree.h>
|
||||
#include <DB/Storages/MergeTree/MergeTreeData.h>
|
||||
#include <DB/Storages/MergeTree/PKCondition.h>
|
||||
|
||||
#include <DB/Storages/MergeTree/MergeTreeReader.h>
|
||||
@ -18,7 +18,7 @@ public:
|
||||
/// (например, поток, сливаящий куски). В таком случае сам storage должен следить, чтобы не удалить данные, пока их читают.
|
||||
MergeTreeBlockInputStream(const String & path_, /// Путь к куску
|
||||
size_t block_size_, const Names & column_names_,
|
||||
StorageMergeTree & storage_, const StorageMergeTree::DataPartPtr & owned_data_part_,
|
||||
MergeTreeData & storage_, const MergeTreeData::DataPartPtr & owned_data_part_,
|
||||
const MarkRanges & mark_ranges_, StoragePtr owned_storage, bool use_uncompressed_cache_,
|
||||
ExpressionActionsPtr prewhere_actions_, String prewhere_column_)
|
||||
: IProfilingBlockInputStream(owned_storage),
|
||||
@ -74,8 +74,8 @@ public:
|
||||
|
||||
/// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона.
|
||||
static MarkRanges markRangesFromPkRange(
|
||||
const StorageMergeTree::DataPart::Index & index,
|
||||
StorageMergeTree & storage,
|
||||
const MergeTreeData::DataPart::Index & index,
|
||||
MergeTreeData & storage,
|
||||
PKCondition & key_condition)
|
||||
{
|
||||
MarkRanges res;
|
||||
@ -321,8 +321,8 @@ private:
|
||||
Names column_names;
|
||||
NameSet column_name_set;
|
||||
Names pre_column_names;
|
||||
StorageMergeTree & storage;
|
||||
const StorageMergeTree::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
|
||||
MergeTreeData & storage;
|
||||
const MergeTreeData::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
|
||||
MarkRanges all_mark_ranges; /// В каких диапазонах засечек читать. В порядке возрастания номеров.
|
||||
MarkRanges remaining_mark_ranges; /// В каких диапазонах засечек еще не прочли.
|
||||
/// В порядке убывания номеров, чтобы можно было выбрасывать из конца.
|
||||
|
527
dbms/include/DB/Storages/MergeTree/MergeTreeData.h
Normal file
527
dbms/include/DB/Storages/MergeTree/MergeTreeData.h
Normal file
@ -0,0 +1,527 @@
|
||||
#pragma once
|
||||
|
||||
#include <statdaemons/Increment.h>
|
||||
#include <statdaemons/threadpool.hpp>
|
||||
|
||||
#include <DB/Core/SortDescription.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/Interpreters/ExpressionActions.h>
|
||||
#include <DB/Storages/IStorage.h>
|
||||
#include <Poco/RWLock.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Структура данных для *MergeTree движков.
|
||||
* Используется merge tree для инкрементальной сортировки данных.
|
||||
* Таблица представлена набором сортированных кусков.
|
||||
* При вставке, данные сортируются по указанному выражению (первичному ключу) и пишутся в новый кусок.
|
||||
* Куски объединяются в фоне, согласно некоторой эвристике.
|
||||
* Для каждого куска, создаётся индексный файл, содержащий значение первичного ключа для каждой n-ой строки.
|
||||
* Таким образом, реализуется эффективная выборка по диапазону первичного ключа.
|
||||
*
|
||||
* Дополнительно:
|
||||
*
|
||||
* Указывается столбец, содержащий дату.
|
||||
* Для каждого куска пишется минимальная и максимальная дата.
|
||||
* (по сути - ещё один индекс)
|
||||
*
|
||||
* Данные разделяются по разным месяцам (пишутся в разные куски для разных месяцев).
|
||||
* Куски для разных месяцев не объединяются - для простоты эксплуатации.
|
||||
* (дают локальность обновлений, что удобно для синхронизации и бэкапа)
|
||||
*
|
||||
* Структура файлов:
|
||||
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
|
||||
* Внутри директории с куском:
|
||||
* primary.idx - индексный файл.
|
||||
* Column.bin - данные столбца
|
||||
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
|
||||
*
|
||||
* Имеется несколько режимов работы, определяющих, что делать при мердже:
|
||||
* - Ordinary - ничего дополнительно не делать;
|
||||
* - Collapsing - при склейке кусков "схлопывать"
|
||||
* пары записей с разными значениями sign_column для одного значения первичного ключа.
|
||||
* (см. CollapsingSortedBlockInputStream.h)
|
||||
* - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK.
|
||||
*/
|
||||
|
||||
/// NOTE: Следующее пока не правда. Сейчас тут практически весь StorageMergeTree. Лишние части нужно перенести отсюда в StorageMergeTree.
|
||||
|
||||
/** Этот класс отвечает за хранение локальных данных всех *MergeTree движков.
|
||||
* - Поддерживает набор кусков на диске. Синхронизирует доступ к ним, поддерживает в памяти их список.
|
||||
* - Полностью выполняет запросы SELECT.
|
||||
* - Сам не принимает решений об изменении данных.
|
||||
* - Умеет дававть рекомендации:
|
||||
* - Говорить, какие куски нужно удалить, потому что они покрыты другими кусками.
|
||||
* - Выбирать набор кусков для слияния.
|
||||
* При этом нужна внешняя информация о том, какие куски с какими разрешено объединять.
|
||||
* - Умеет изменять данные по запросу:
|
||||
* - Записать новый кусок с данными из блока.
|
||||
* - Слить указанные куски.
|
||||
* - Сделать ALTER.
|
||||
*/
|
||||
|
||||
struct MergeTreeSettings
|
||||
{
|
||||
/// Набор кусков разрешено объединить, если среди них максимальный размер не более чем во столько раз больше суммы остальных.
|
||||
double max_size_ratio_to_merge_parts = 5;
|
||||
|
||||
/// Сколько за раз сливать кусков.
|
||||
/// Трудоемкость выбора кусков O(N * max_parts_to_merge_at_once), так что не следует делать это число слишком большим.
|
||||
/// С другой стороны, чтобы слияния точно не могли зайти в тупик, нужно хотя бы
|
||||
/// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts).
|
||||
size_t max_parts_to_merge_at_once = 10;
|
||||
|
||||
/// Куски настолько большого размера в основном потоке объединять нельзя вообще.
|
||||
size_t max_rows_to_merge_parts = 100 * 1024 * 1024;
|
||||
|
||||
/// Куски настолько большого размера во втором потоке объединять нельзя вообще.
|
||||
size_t max_rows_to_merge_parts_second = 1024 * 1024;
|
||||
|
||||
/// Во столько раз ночью увеличиваем коэффициент.
|
||||
size_t merge_parts_at_night_inc = 10;
|
||||
|
||||
/// Сколько потоков использовать для объединения кусков.
|
||||
size_t merging_threads = 2;
|
||||
|
||||
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
|
||||
size_t min_rows_for_concurrent_read = 20 * 8192;
|
||||
|
||||
/// Можно пропускать чтение более чем стольки строк ценой одного seek по файлу.
|
||||
size_t min_rows_for_seek = 5 * 8192;
|
||||
|
||||
/// Если отрезок индекса может содержать нужные ключи, делим его на столько частей и рекурсивно проверяем их.
|
||||
size_t coarse_index_granularity = 8;
|
||||
|
||||
/** Максимальное количество строк на запрос, для использования кэша разжатых данных. Если запрос большой - кэш не используется.
|
||||
* (Чтобы большие запросы не вымывали кэш.)
|
||||
*/
|
||||
size_t max_rows_to_use_cache = 1024 * 1024;
|
||||
|
||||
/// Через сколько секунд удалять old_куски.
|
||||
time_t old_parts_lifetime = 5 * 60;
|
||||
};
|
||||
|
||||
/// Пара засечек, определяющая диапазон строк в куске. Именно, диапазон имеет вид [begin * index_granularity, end * index_granularity).
|
||||
struct MarkRange
|
||||
{
|
||||
size_t begin;
|
||||
size_t end;
|
||||
|
||||
MarkRange() {}
|
||||
MarkRange(size_t begin_, size_t end_) : begin(begin_), end(end_) {}
|
||||
};
|
||||
|
||||
typedef std::vector<MarkRange> MarkRanges;
|
||||
|
||||
|
||||
class MergeTreeData : public IColumnsDeclaration
|
||||
{
|
||||
friend class MergeTreeReader;
|
||||
friend class MergeTreeBlockInputStream;
|
||||
friend class MergeTreeDataBlockOutputStream;
|
||||
friend class IMergedBlockOutputStream;
|
||||
friend class MergedBlockOutputStream;
|
||||
friend class MergedColumnOnlyOutputStream;
|
||||
|
||||
public:
|
||||
/// Режим работы. См. выше.
|
||||
enum Mode
|
||||
{
|
||||
Ordinary,
|
||||
Collapsing,
|
||||
Summing,
|
||||
};
|
||||
|
||||
/** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце),
|
||||
* (корректность имён и путей не проверяется)
|
||||
* состоящую из указанных столбцов.
|
||||
*
|
||||
* primary_expr_ast - выражение для сортировки;
|
||||
* date_column_name - имя столбца с датой;
|
||||
* index_granularity - на сколько строчек пишется одно значение индекса.
|
||||
*/
|
||||
MergeTreeData( StoragePtr owning_storage_, const String & path_, const String & name_, NamesAndTypesListPtr columns_,
|
||||
const Context & context_,
|
||||
ASTPtr & primary_expr_ast_,
|
||||
const String & date_column_name_,
|
||||
const ASTPtr & sampling_expression_, /// NULL, если семплирование не поддерживается.
|
||||
size_t index_granularity_,
|
||||
Mode mode_,
|
||||
const String & sign_column_,
|
||||
const MergeTreeSettings & settings_);
|
||||
|
||||
void shutdown();
|
||||
~MergeTreeData();
|
||||
|
||||
std::string getModePrefix() const
|
||||
{
|
||||
switch (mode)
|
||||
{
|
||||
case Ordinary: return "";
|
||||
case Collapsing: return "Collapsing";
|
||||
case Summing: return "Summing";
|
||||
|
||||
default:
|
||||
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
std::string getTableName() const { return name; }
|
||||
std::string getSignColumnName() const { return sign_column; }
|
||||
bool supportsSampling() const { return !!sampling_expression; }
|
||||
bool supportsFinal() const { return !sign_column.empty(); }
|
||||
bool supportsPrewhere() const { return true; }
|
||||
|
||||
const NamesAndTypesList & getColumnsList() const { return *columns; }
|
||||
|
||||
/** При чтении, выбирается набор кусков, покрывающий нужный диапазон индекса.
|
||||
*/
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
ASTPtr query,
|
||||
const Settings & settings,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size = DEFAULT_BLOCK_SIZE,
|
||||
unsigned threads = 1);
|
||||
|
||||
/** При записи, данные сортируются и пишутся в новые куски.
|
||||
*/
|
||||
BlockOutputStreamPtr write(ASTPtr query);
|
||||
|
||||
/** Выполнить очередной шаг объединения кусков.
|
||||
*/
|
||||
bool optimize()
|
||||
{
|
||||
merge(1, false, true);
|
||||
return true;
|
||||
}
|
||||
|
||||
void dropImpl();
|
||||
|
||||
void rename(const String & new_path_to_db, const String & new_name);
|
||||
|
||||
/// Метод ALTER позволяет добавлять и удалять столбцы.
|
||||
/// Метод ALTER нужно применять, когда обращения к базе приостановлены.
|
||||
/// Например если параллельно с INSERT выполнить ALTER, то ALTER выполниться, а INSERT бросит исключение
|
||||
void alter(const ASTAlterQuery::Parameters & params);
|
||||
|
||||
class BigLock
|
||||
{
|
||||
public:
|
||||
BigLock(MergeTreeData & storage) : merge_lock(storage.merge_lock),
|
||||
write_lock(storage.write_lock), read_lock(storage.read_lock)
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
Poco::ScopedWriteRWLock merge_lock;
|
||||
Poco::ScopedWriteRWLock write_lock;
|
||||
Poco::ScopedWriteRWLock read_lock;
|
||||
};
|
||||
|
||||
typedef Poco::SharedPtr<BigLock> BigLockPtr;
|
||||
BigLockPtr lockAllOperations()
|
||||
{
|
||||
return new BigLock(*this);
|
||||
}
|
||||
|
||||
private:
|
||||
StoragePtr owning_storage;
|
||||
|
||||
String path;
|
||||
String name;
|
||||
String full_path;
|
||||
NamesAndTypesListPtr columns;
|
||||
|
||||
const Context & context;
|
||||
ASTPtr primary_expr_ast;
|
||||
String date_column_name;
|
||||
ASTPtr sampling_expression;
|
||||
size_t index_granularity;
|
||||
|
||||
size_t min_marks_for_seek;
|
||||
size_t min_marks_for_concurrent_read;
|
||||
size_t max_marks_to_use_cache;
|
||||
|
||||
/// Режим работы - какие дополнительные действия делать при мердже.
|
||||
Mode mode;
|
||||
/// Для схлопывания записей об изменениях, если используется Collapsing режим работы.
|
||||
String sign_column;
|
||||
|
||||
MergeTreeSettings settings;
|
||||
|
||||
ExpressionActionsPtr primary_expr;
|
||||
SortDescription sort_descr;
|
||||
Block primary_key_sample;
|
||||
|
||||
Increment increment;
|
||||
|
||||
Logger * log;
|
||||
volatile bool shutdown_called;
|
||||
|
||||
/// Регулярное выражение соответсвующее названию директории с кусочками
|
||||
Poco::RegularExpression file_name_regexp;
|
||||
|
||||
/// Описание куска с данными.
|
||||
struct DataPart
|
||||
{
|
||||
DataPart(MergeTreeData & storage_) : storage(storage_), size_in_bytes(0), currently_merging(false) {}
|
||||
|
||||
MergeTreeData & storage;
|
||||
DayNum_t left_date;
|
||||
DayNum_t right_date;
|
||||
UInt64 left;
|
||||
UInt64 right;
|
||||
/// Уровень игнорируется. Использовался предыдущей эвристикой слияния.
|
||||
UInt32 level;
|
||||
|
||||
std::string name;
|
||||
size_t size; /// в количестве засечек.
|
||||
size_t size_in_bytes; /// размер в байтах, 0 - если не посчитано
|
||||
time_t modification_time;
|
||||
|
||||
DayNum_t left_month;
|
||||
DayNum_t right_month;
|
||||
|
||||
/// Смотреть и изменять это поле следует под залоченным data_parts_mutex.
|
||||
bool currently_merging;
|
||||
|
||||
/// Первичный ключ. Всегда загружается в оперативку.
|
||||
typedef std::vector<Field> Index;
|
||||
Index index;
|
||||
|
||||
/// NOTE можно загружать засечки тоже в оперативку
|
||||
|
||||
/// Вычисляем сумарный размер всей директории со всеми файлами
|
||||
static size_t calcTotalSize(const String &from)
|
||||
{
|
||||
Poco::File cur(from);
|
||||
if (cur.isFile())
|
||||
return cur.getSize();
|
||||
std::vector<std::string> files;
|
||||
cur.list(files);
|
||||
size_t res = 0;
|
||||
for (size_t i = 0; i < files.size(); ++i)
|
||||
res += calcTotalSize(from + files[i]);
|
||||
return res;
|
||||
}
|
||||
|
||||
void remove() const
|
||||
{
|
||||
String from = storage.full_path + name + "/";
|
||||
String to = storage.full_path + "tmp2_" + name + "/";
|
||||
|
||||
Poco::File(from).renameTo(to);
|
||||
Poco::File(to).remove(true);
|
||||
}
|
||||
|
||||
void renameToOld() const
|
||||
{
|
||||
String from = storage.full_path + name + "/";
|
||||
String to = storage.full_path + "old_" + name + "/";
|
||||
|
||||
Poco::File f(from);
|
||||
f.setLastModified(Poco::Timestamp::fromEpochTime(time(0)));
|
||||
f.renameTo(to);
|
||||
}
|
||||
|
||||
bool operator< (const DataPart & rhs) const
|
||||
{
|
||||
if (left_month < rhs.left_month)
|
||||
return true;
|
||||
if (left_month > rhs.left_month)
|
||||
return false;
|
||||
if (right_month < rhs.right_month)
|
||||
return true;
|
||||
if (right_month > rhs.right_month)
|
||||
return false;
|
||||
|
||||
if (left < rhs.left)
|
||||
return true;
|
||||
if (left > rhs.left)
|
||||
return false;
|
||||
if (right < rhs.right)
|
||||
return true;
|
||||
if (right > rhs.right)
|
||||
return false;
|
||||
|
||||
if (level < rhs.level)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Содержит другой кусок (получен после объединения другого куска с каким-то ещё)
|
||||
bool contains(const DataPart & rhs) const
|
||||
{
|
||||
return left_month == rhs.left_month /// Куски за разные месяцы не объединяются
|
||||
&& right_month == rhs.right_month
|
||||
&& level > rhs.level
|
||||
&& left_date <= rhs.left_date
|
||||
&& right_date >= rhs.right_date
|
||||
&& left <= rhs.left
|
||||
&& right >= rhs.right;
|
||||
}
|
||||
|
||||
/// Загрузить индекс и вычислить размер.
|
||||
void loadIndex()
|
||||
{
|
||||
size_t key_size = storage.sort_descr.size();
|
||||
index.resize(key_size * size);
|
||||
|
||||
String index_path = storage.full_path + name + "/primary.idx";
|
||||
ReadBufferFromFile index_file(index_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
for (size_t j = 0; j < key_size; ++j)
|
||||
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i * key_size + j], index_file);
|
||||
|
||||
if (!index_file.eof())
|
||||
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
|
||||
|
||||
size_in_bytes = calcTotalSize(storage.full_path + name + "/");
|
||||
}
|
||||
};
|
||||
|
||||
typedef SharedPtr<DataPart> DataPartPtr;
|
||||
struct DataPartPtrLess { bool operator() (const DataPartPtr & lhs, const DataPartPtr & rhs) const { return *lhs < *rhs; } };
|
||||
typedef std::set<DataPartPtr, DataPartPtrLess> DataParts;
|
||||
|
||||
struct RangesInDataPart
|
||||
{
|
||||
DataPartPtr data_part;
|
||||
MarkRanges ranges;
|
||||
|
||||
RangesInDataPart() {}
|
||||
|
||||
RangesInDataPart(DataPartPtr data_part_)
|
||||
: data_part(data_part_)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
/// Пока существует, помечает части как currently_merging и пересчитывает общий объем сливаемых данных.
|
||||
/// Вероятно, что части будут помечены заранее.
|
||||
class CurrentlyMergingPartsTagger
|
||||
{
|
||||
public:
|
||||
std::vector<DataPartPtr> parts;
|
||||
Poco::FastMutex & data_mutex;
|
||||
|
||||
CurrentlyMergingPartsTagger(const std::vector<DataPartPtr> & parts_, Poco::FastMutex & data_mutex_) : parts(parts_), data_mutex(data_mutex_)
|
||||
{
|
||||
/// Здесь не лочится мьютекс, так как конструктор вызывается внутри selectPartsToMerge, где он уже залочен
|
||||
/// Poco::ScopedLock<Poco::FastMutex> lock(data_mutex);
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
parts[i]->currently_merging = true;
|
||||
MergeTreeData::total_size_of_currently_merging_parts += parts[i]->size_in_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
~CurrentlyMergingPartsTagger()
|
||||
{
|
||||
Poco::ScopedLock<Poco::FastMutex> lock(data_mutex);
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
parts[i]->currently_merging = false;
|
||||
MergeTreeData::total_size_of_currently_merging_parts -= parts[i]->size_in_bytes;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Сумарный размер currently_merging кусочков в байтах.
|
||||
/// Нужно чтобы оценить количество места на диске, которое может понадобится для завершения этих мерджей.
|
||||
static size_t total_size_of_currently_merging_parts;
|
||||
|
||||
typedef std::vector<RangesInDataPart> RangesInDataParts;
|
||||
|
||||
/** @warning Если берете насколько блокировок, то берите их везде в одинаковом порядке - в том же как они написаны в этом файле */
|
||||
/** Взятие этого лока на запись, запрещает мердж */
|
||||
Poco::RWLock merge_lock;
|
||||
|
||||
/** Взятие этого лока на запись, запрещает запись */
|
||||
Poco::RWLock write_lock;
|
||||
|
||||
/** Взятие этого лока на запись, запрещает чтение */
|
||||
Poco::RWLock read_lock;
|
||||
|
||||
/** Актуальное множество кусков с данными. */
|
||||
DataParts data_parts;
|
||||
Poco::FastMutex data_parts_mutex;
|
||||
|
||||
/** Множество всех кусков с данными, включая уже слитые в более крупные, но ещё не удалённые. Оно обычно небольшое (десятки элементов).
|
||||
* Ссылки на кусок есть отсюда, из списка актуальных кусков, и из каждого потока чтения, который его сейчас использует.
|
||||
* То есть, если количество ссылок равно 1 - то кусок не актуален и не используется прямо сейчас, и его можно удалить.
|
||||
*/
|
||||
DataParts all_data_parts;
|
||||
Poco::FastMutex all_data_parts_mutex;
|
||||
|
||||
static String getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level);
|
||||
|
||||
BlockInputStreams spreadMarkRangesAmongThreads(
|
||||
RangesInDataParts parts,
|
||||
size_t threads,
|
||||
const Names & column_names,
|
||||
size_t max_block_size,
|
||||
bool use_uncompressed_cache,
|
||||
ExpressionActionsPtr prewhere_actions,
|
||||
const String & prewhere_column);
|
||||
|
||||
BlockInputStreams spreadMarkRangesAmongThreadsFinal(
|
||||
RangesInDataParts parts,
|
||||
size_t threads,
|
||||
const Names & column_names,
|
||||
size_t max_block_size,
|
||||
bool use_uncompressed_cache,
|
||||
ExpressionActionsPtr prewhere_actions,
|
||||
const String & prewhere_column);
|
||||
|
||||
/// Создать выражение "Sign == 1".
|
||||
void createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column);
|
||||
|
||||
/// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта.
|
||||
void loadDataParts();
|
||||
|
||||
/// Удалить неактуальные куски.
|
||||
void clearOldParts();
|
||||
|
||||
/** Определяет, какие куски нужно объединять, и запускает их слияние в отдельном потоке. Если iterations = 0, объединяет, пока это возможно.
|
||||
* Если aggressive - выбрать куски не обращая внимание на соотношение размеров и их новизну (для запроса OPTIMIZE).
|
||||
*/
|
||||
void merge(size_t iterations = 1, bool async = true, bool aggressive = false);
|
||||
|
||||
/// Если while_can, объединяет в цикле, пока можно; иначе выбирает и объединяет только одну пару кусков.
|
||||
void mergeThread(bool while_can, bool aggressive);
|
||||
|
||||
/// Сразу помечает их как currently_merging.
|
||||
/// Если merge_anything_for_old_months, для кусков за прошедшие месяцы снимается ограничение на соотношение размеров.
|
||||
bool selectPartsToMerge(Poco::SharedPtr<CurrentlyMergingPartsTagger> & what, bool merge_anything_for_old_months, bool aggressive);
|
||||
|
||||
void mergeParts(Poco::SharedPtr<CurrentlyMergingPartsTagger> & what);
|
||||
|
||||
/// Дождаться, пока фоновые потоки закончат слияния.
|
||||
void joinMergeThreads();
|
||||
|
||||
Poco::SharedPtr<boost::threadpool::pool> merge_threads;
|
||||
|
||||
void removeColumnFiles(String column_name);
|
||||
|
||||
/// Возвращает true если имя директории совпадает с форматом имени директории кусочков
|
||||
bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) const;
|
||||
|
||||
/// Кладет в DataPart данные из имени кусочка.
|
||||
void parsePartName(const String & file_name, const Poco::RegularExpression::MatchVec & matches, DataPart & part);
|
||||
|
||||
/// Определить, не битые ли данные в директории. Проверяет индекс и засечеки, но не сами данные.
|
||||
bool isBrokenPart(const String & path);
|
||||
|
||||
/// Найти самые большие old_куски, из которых получен этот кусок.
|
||||
/// Переименовать их, убрав префикс old_ и вернуть их имена.
|
||||
Strings tryRestorePart(const String & path, const String & file_name, Strings & old_parts);
|
||||
|
||||
void createConvertExpression(const String & in_column_name, const String & out_type, ExpressionActionsPtr & out_expression, String & out_column);
|
||||
};
|
||||
|
||||
}
|
@ -7,16 +7,16 @@
|
||||
|
||||
#include <DB/Interpreters/sortBlock.h>
|
||||
|
||||
#include <DB/Storages/StorageMergeTree.h>
|
||||
#include <DB/Storages/MergeTree/MergeTreeData.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class MergeTreeBlockOutputStream : public IBlockOutputStream
|
||||
|
||||
class MergeTreeDataBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
public:
|
||||
MergeTreeBlockOutputStream(StoragePtr owned_storage) : IBlockOutputStream(owned_storage), storage(dynamic_cast<StorageMergeTree &>(*owned_storage)), flags(O_TRUNC | O_CREAT | O_WRONLY)
|
||||
MergeTreeDataBlockOutputStream(MergeTreeData & data, StoragePtr owned_storage) : IBlockOutputStream(owned_storage), storage(data), flags(O_TRUNC | O_CREAT | O_WRONLY)
|
||||
{
|
||||
}
|
||||
|
||||
@ -25,7 +25,7 @@ public:
|
||||
Poco::ScopedReadRWLock write_lock(storage.write_lock);
|
||||
|
||||
storage.check(block, true);
|
||||
|
||||
|
||||
DateLUTSingleton & date_lut = DateLUTSingleton::instance();
|
||||
|
||||
size_t rows = block.rows();
|
||||
@ -82,7 +82,7 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
StorageMergeTree & storage;
|
||||
MergeTreeData & storage;
|
||||
|
||||
const int flags;
|
||||
|
||||
@ -130,7 +130,7 @@ private:
|
||||
LOG_TRACE(storage.log, "Writing index.");
|
||||
|
||||
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
|
||||
StorageMergeTree::DataPart::Index index_vec;
|
||||
MergeTreeData::DataPart::Index index_vec;
|
||||
index_vec.reserve(part_size * storage.sort_descr.size());
|
||||
|
||||
{
|
||||
@ -183,7 +183,7 @@ private:
|
||||
String part_name = storage.getPartName(DayNum_t(min_date), DayNum_t(max_date), part_id, part_id, 0);
|
||||
String part_res_path = storage.full_path + part_name + "/";
|
||||
|
||||
StorageMergeTree::DataPartPtr new_data_part = new StorageMergeTree::DataPart(storage);
|
||||
MergeTreeData::DataPartPtr new_data_part = new MergeTreeData::DataPart(storage);
|
||||
new_data_part->left_date = DayNum_t(min_date);
|
||||
new_data_part->right_date = DayNum_t(max_date);
|
||||
new_data_part->left = part_id;
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/Storages/StorageMergeTree.h>
|
||||
#include <DB/Storages/MergeTree/MergeTreeData.h>
|
||||
#include <DB/DataTypes/IDataType.h>
|
||||
#include <DB/DataTypes/DataTypeNested.h>
|
||||
#include <DB/Core/NamesAndTypes.h>
|
||||
@ -22,7 +22,7 @@ class MergeTreeReader
|
||||
{
|
||||
public:
|
||||
MergeTreeReader(const String & path_, /// Путь к куску
|
||||
const Names & columns_names_, bool use_uncompressed_cache_, StorageMergeTree & storage_)
|
||||
const Names & columns_names_, bool use_uncompressed_cache_, MergeTreeData & storage_)
|
||||
: path(path_), column_names(columns_names_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_)
|
||||
{
|
||||
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
|
||||
@ -220,7 +220,7 @@ private:
|
||||
FileStreams streams;
|
||||
Names column_names;
|
||||
bool use_uncompressed_cache;
|
||||
StorageMergeTree & storage;
|
||||
MergeTreeData & storage;
|
||||
|
||||
void addStream(const String & name, const IDataType & type, size_t level = 0)
|
||||
{
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <DB/IO/WriteBufferFromFile.h>
|
||||
#include <DB/IO/CompressedWriteBuffer.h>
|
||||
|
||||
#include <DB/Storages/StorageMergeTree.h>
|
||||
#include <DB/Storages/MergeTree/MergeTreeData.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -11,7 +11,7 @@ namespace DB
|
||||
class IMergedBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
public:
|
||||
IMergedBlockOutputStream(StorageMergeTree & storage_) : storage(storage_), index_offset(0)
|
||||
IMergedBlockOutputStream(MergeTreeData & storage_) : storage(storage_), index_offset(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -182,7 +182,7 @@ protected:
|
||||
}
|
||||
}
|
||||
|
||||
StorageMergeTree & storage;
|
||||
MergeTreeData & storage;
|
||||
|
||||
ColumnStreams column_streams;
|
||||
|
||||
@ -196,7 +196,7 @@ protected:
|
||||
class MergedBlockOutputStream : public IMergedBlockOutputStream
|
||||
{
|
||||
public:
|
||||
MergedBlockOutputStream(StorageMergeTree & storage_,
|
||||
MergedBlockOutputStream(MergeTreeData & storage_,
|
||||
UInt16 min_date, UInt16 max_date, UInt64 min_part_id, UInt64 max_part_id, UInt32 level)
|
||||
: IMergedBlockOutputStream(storage_), marks_count(0)
|
||||
{
|
||||
@ -299,7 +299,7 @@ typedef Poco::SharedPtr<MergedBlockOutputStream> MergedBlockOutputStreamPtr;
|
||||
class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream
|
||||
{
|
||||
public:
|
||||
MergedColumnOnlyOutputStream(StorageMergeTree & storage_, String part_path_, bool sync_ = false) :
|
||||
MergedColumnOnlyOutputStream(MergeTreeData & storage_, String part_path_, bool sync_ = false) :
|
||||
IMergedBlockOutputStream(storage_), part_path(part_path_), initialized(false), sync(sync_)
|
||||
{
|
||||
}
|
||||
|
@ -1,27 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Отвечает за хранение локальных данных всех *MergeTree движков.
|
||||
* - Поддерживает набор кусков на диске. Синхронизирует доступ к ним, поддерживает в памяти их список.
|
||||
* - Полностью выполняет запросы SELECT.
|
||||
* - Сам не принимает решений об изменении данных.
|
||||
* - Умеет двавть рекомендации:
|
||||
* - Говорить, какие куски нужно удалить, потому что они покрыты другими кусками.
|
||||
* - Выбирать набор кусков для слияния.
|
||||
* При этом нужна внешняя информация о том, какие куски с какими разрешено объединять.
|
||||
* - Умеет изменять данные по запросу:
|
||||
* - Записать новый кусок с данными из блока.
|
||||
* - Слить указанные куски.
|
||||
* - Сделать ALTER.
|
||||
*/
|
||||
class MergeTreeData
|
||||
{
|
||||
public:
|
||||
|
||||
private:
|
||||
|
||||
};
|
||||
|
||||
}
|
@ -1,123 +1,15 @@
|
||||
#pragma once
|
||||
|
||||
#include <statdaemons/Increment.h>
|
||||
#include <statdaemons/threadpool.hpp>
|
||||
|
||||
#include <DB/Core/SortDescription.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/Interpreters/ExpressionActions.h>
|
||||
#include <DB/Storages/IStorage.h>
|
||||
#include <Poco/RWLock.h>
|
||||
|
||||
#include <DB/Storages/MergeTree/MergeTreeData.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Движок, использующий merge tree для инкрементальной сортировки данных.
|
||||
* Таблица представлена набором сортированных кусков.
|
||||
* При вставке, данные сортируются по указанному выражению (первичному ключу) и пишутся в новый кусок.
|
||||
* Куски объединяются в фоне, согласно некоторой эвристике.
|
||||
* Для каждого куска, создаётся индексный файл, содержащий значение первичного ключа для каждой n-ой строки.
|
||||
* Таким образом, реализуется эффективная выборка по диапазону первичного ключа.
|
||||
*
|
||||
* Дополнительно:
|
||||
*
|
||||
* Указывается столбец, содержащий дату.
|
||||
* Для каждого куска пишется минимальная и максимальная дата.
|
||||
* (по сути - ещё один индекс)
|
||||
*
|
||||
* Данные разделяются по разным месяцам (пишутся в разные куски для разных месяцев).
|
||||
* Куски для разных месяцев не объединяются - для простоты эксплуатации.
|
||||
* (дают локальность обновлений, что удобно для синхронизации и бэкапа)
|
||||
*
|
||||
* Структура файлов:
|
||||
* / increment.txt - файл, содержащий одно число, увеличивающееся на 1 - для генерации идентификаторов кусков.
|
||||
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
|
||||
* / min-date _ max-date _ min-id _ max-id _ level / primary.idx - индексный файл.
|
||||
* Внутри директории с куском:
|
||||
* Column.bin - данные столбца
|
||||
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
|
||||
*
|
||||
* Имеется несколько режимов работы, определяющих, что делать при мердже:
|
||||
* - Ordinary - ничего дополнительно не делать;
|
||||
* - Collapsing - при склейке кусков "схлопывать"
|
||||
* пары записей с разными значениями sign_column для одного значения первичного ключа.
|
||||
* (см. CollapsingSortedBlockInputStream.h)
|
||||
* - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK.
|
||||
/** См. описание структуры данных в MergeTreeData.
|
||||
*/
|
||||
|
||||
struct StorageMergeTreeSettings
|
||||
{
|
||||
/// Набор кусков разрешено объединить, если среди них максимальный размер не более чем во столько раз больше суммы остальных.
|
||||
double max_size_ratio_to_merge_parts = 5;
|
||||
|
||||
/// Сколько за раз сливать кусков.
|
||||
/// Трудоемкость выбора кусков O(N * max_parts_to_merge_at_once), так что не следует делать это число слишком большим.
|
||||
/// С другой стороны, чтобы слияния точно не могли зайти в тупик, нужно хотя бы
|
||||
/// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts).
|
||||
size_t max_parts_to_merge_at_once = 10;
|
||||
|
||||
/// Куски настолько большого размера в основном потоке объединять нельзя вообще.
|
||||
size_t max_rows_to_merge_parts = 100 * 1024 * 1024;
|
||||
|
||||
/// Куски настолько большого размера во втором потоке объединять нельзя вообще.
|
||||
size_t max_rows_to_merge_parts_second = 1024 * 1024;
|
||||
|
||||
/// Во столько раз ночью увеличиваем коэффициент.
|
||||
size_t merge_parts_at_night_inc = 10;
|
||||
|
||||
/// Сколько потоков использовать для объединения кусков.
|
||||
size_t merging_threads = 2;
|
||||
|
||||
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
|
||||
size_t min_rows_for_concurrent_read = 20 * 8192;
|
||||
|
||||
/// Можно пропускать чтение более чем стольки строк ценой одного seek по файлу.
|
||||
size_t min_rows_for_seek = 5 * 8192;
|
||||
|
||||
/// Если отрезок индекса может содержать нужные ключи, делим его на столько частей и рекурсивно проверяем их.
|
||||
size_t coarse_index_granularity = 8;
|
||||
|
||||
/** Максимальное количество строк на запрос, для использования кэша разжатых данных. Если запрос большой - кэш не используется.
|
||||
* (Чтобы большие запросы не вымывали кэш.)
|
||||
*/
|
||||
size_t max_rows_to_use_cache = 1024 * 1024;
|
||||
|
||||
/// Через сколько секунд удалять old_куски.
|
||||
time_t old_parts_lifetime = 5 * 60;
|
||||
};
|
||||
|
||||
/// Пара засечек, определяющая диапазон строк в куске. Именно, диапазон имеет вид [begin * index_granularity, end * index_granularity).
|
||||
struct MarkRange
|
||||
{
|
||||
size_t begin;
|
||||
size_t end;
|
||||
|
||||
MarkRange() {}
|
||||
MarkRange(size_t begin_, size_t end_) : begin(begin_), end(end_) {}
|
||||
};
|
||||
|
||||
typedef std::vector<MarkRange> MarkRanges;
|
||||
|
||||
|
||||
class StorageMergeTree : public IStorage
|
||||
{
|
||||
friend class MergeTreeReader;
|
||||
friend class MergeTreeBlockInputStream;
|
||||
friend class MergeTreeBlockOutputStream;
|
||||
friend class IMergedBlockOutputStream;
|
||||
friend class MergedBlockOutputStream;
|
||||
friend class MergedColumnOnlyOutputStream;
|
||||
|
||||
public:
|
||||
/// Режим работы. См. выше.
|
||||
enum Mode
|
||||
{
|
||||
Ordinary,
|
||||
Collapsing,
|
||||
Summing,
|
||||
};
|
||||
|
||||
/** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце),
|
||||
* (корректность имён и путей не проверяется)
|
||||
* состоящую из указанных столбцов.
|
||||
@ -132,36 +24,26 @@ public:
|
||||
const String & date_column_name_,
|
||||
const ASTPtr & sampling_expression_, /// NULL, если семплирование не поддерживается.
|
||||
size_t index_granularity_,
|
||||
Mode mode_ = Ordinary,
|
||||
MergeTreeData::Mode mode_ = MergeTreeData::Ordinary,
|
||||
const String & sign_column_ = "",
|
||||
const StorageMergeTreeSettings & settings_ = StorageMergeTreeSettings());
|
||||
const MergeTreeSettings & settings_ = MergeTreeSettings());
|
||||
|
||||
void shutdown();
|
||||
~StorageMergeTree();
|
||||
|
||||
std::string getName() const
|
||||
{
|
||||
switch (mode)
|
||||
{
|
||||
case Ordinary: return "MergeTree";
|
||||
case Collapsing: return "CollapsingMergeTree";
|
||||
case Summing: return "SummingMergeTree";
|
||||
|
||||
default:
|
||||
throw Exception("Unknown mode of operation for StorageMergeTree: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
return data.getModePrefix() + "MergeTree";
|
||||
}
|
||||
|
||||
std::string getTableName() const { return name; }
|
||||
std::string getSignColumnName() const { return sign_column; }
|
||||
bool supportsSampling() const { return !!sampling_expression; }
|
||||
bool supportsFinal() const { return !sign_column.empty(); }
|
||||
bool supportsPrewhere() const { return true; }
|
||||
std::string getTableName() const { return data.getTableName(); }
|
||||
std::string getSignColumnName() const { return data.getSignColumnName(); }
|
||||
bool supportsSampling() const { return data.supportsSampling(); }
|
||||
bool supportsFinal() const { return data.supportsFinal(); }
|
||||
bool supportsPrewhere() const { return data.supportsPrewhere(); }
|
||||
|
||||
const NamesAndTypesList & getColumnsList() const { return *columns; }
|
||||
const NamesAndTypesList & getColumnsList() const { return data.getColumnsList(); }
|
||||
|
||||
/** При чтении, выбирается набор кусков, покрывающий нужный диапазон индекса.
|
||||
*/
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
ASTPtr query,
|
||||
@ -170,20 +52,17 @@ public:
|
||||
size_t max_block_size = DEFAULT_BLOCK_SIZE,
|
||||
unsigned threads = 1);
|
||||
|
||||
/** При записи, данные сортируются и пишутся в новые куски.
|
||||
*/
|
||||
BlockOutputStreamPtr write(ASTPtr query);
|
||||
|
||||
/** Выполнить очередной шаг объединения кусков.
|
||||
*/
|
||||
bool optimize()
|
||||
{
|
||||
merge(1, false, true);
|
||||
return true;
|
||||
return data.optimize();
|
||||
}
|
||||
|
||||
void dropImpl();
|
||||
|
||||
|
||||
void rename(const String & new_path_to_db, const String & new_name);
|
||||
|
||||
/// Метод ALTER позволяет добавлять и удалять столбцы.
|
||||
@ -191,255 +70,12 @@ public:
|
||||
/// Например если параллельно с INSERT выполнить ALTER, то ALTER выполниться, а INSERT бросит исключение
|
||||
void alter(const ASTAlterQuery::Parameters & params);
|
||||
|
||||
class BigLock
|
||||
{
|
||||
public:
|
||||
BigLock(StorageMergeTree & storage) : merge_lock(storage.merge_lock),
|
||||
write_lock(storage.write_lock), read_lock(storage.read_lock)
|
||||
{
|
||||
}
|
||||
typedef MergeTreeData::BigLockPtr BigLockPtr;
|
||||
|
||||
private:
|
||||
Poco::ScopedWriteRWLock merge_lock;
|
||||
Poco::ScopedWriteRWLock write_lock;
|
||||
Poco::ScopedWriteRWLock read_lock;
|
||||
};
|
||||
|
||||
typedef Poco::SharedPtr<BigLock> BigLockPtr;
|
||||
BigLockPtr lockAllOperations()
|
||||
{
|
||||
return new BigLock(*this);
|
||||
}
|
||||
BigLockPtr lockAllOperations() { return data.lockAllOperations(); }
|
||||
|
||||
private:
|
||||
String path;
|
||||
String name;
|
||||
String full_path;
|
||||
NamesAndTypesListPtr columns;
|
||||
|
||||
const Context & context;
|
||||
ASTPtr primary_expr_ast;
|
||||
String date_column_name;
|
||||
ASTPtr sampling_expression;
|
||||
size_t index_granularity;
|
||||
|
||||
size_t min_marks_for_seek;
|
||||
size_t min_marks_for_concurrent_read;
|
||||
size_t max_marks_to_use_cache;
|
||||
|
||||
/// Режим работы - какие дополнительные действия делать при мердже.
|
||||
Mode mode;
|
||||
/// Для схлопывания записей об изменениях, если используется Collapsing режим работы.
|
||||
String sign_column;
|
||||
|
||||
StorageMergeTreeSettings settings;
|
||||
|
||||
ExpressionActionsPtr primary_expr;
|
||||
SortDescription sort_descr;
|
||||
Block primary_key_sample;
|
||||
|
||||
Increment increment;
|
||||
|
||||
Logger * log;
|
||||
volatile bool shutdown_called;
|
||||
|
||||
/// Регулярное выражение соответсвующее названию директории с кусочками
|
||||
Poco::RegularExpression file_name_regexp;
|
||||
|
||||
/// Описание куска с данными.
|
||||
struct DataPart
|
||||
{
|
||||
DataPart(StorageMergeTree & storage_) : storage(storage_), size_in_bytes(0), currently_merging(false) {}
|
||||
|
||||
StorageMergeTree & storage;
|
||||
DayNum_t left_date;
|
||||
DayNum_t right_date;
|
||||
UInt64 left;
|
||||
UInt64 right;
|
||||
/// Уровень игнорируется. Использовался предыдущей эвристикой слияния.
|
||||
UInt32 level;
|
||||
|
||||
std::string name;
|
||||
size_t size; /// в количестве засечек.
|
||||
size_t size_in_bytes; /// размер в байтах, 0 - если не посчитано
|
||||
time_t modification_time;
|
||||
|
||||
DayNum_t left_month;
|
||||
DayNum_t right_month;
|
||||
|
||||
/// Смотреть и изменять это поле следует под залоченным data_parts_mutex.
|
||||
bool currently_merging;
|
||||
|
||||
/// Первичный ключ. Всегда загружается в оперативку.
|
||||
typedef std::vector<Field> Index;
|
||||
Index index;
|
||||
|
||||
/// NOTE можно загружать засечки тоже в оперативку
|
||||
|
||||
/// Вычисляем сумарный размер всей директории со всеми файлами
|
||||
static size_t calcTotalSize(const String &from)
|
||||
{
|
||||
Poco::File cur(from);
|
||||
if (cur.isFile())
|
||||
return cur.getSize();
|
||||
std::vector<std::string> files;
|
||||
cur.list(files);
|
||||
size_t res = 0;
|
||||
for (size_t i = 0; i < files.size(); ++i)
|
||||
res += calcTotalSize(from + files[i]);
|
||||
return res;
|
||||
}
|
||||
|
||||
void remove() const
|
||||
{
|
||||
String from = storage.full_path + name + "/";
|
||||
String to = storage.full_path + "tmp2_" + name + "/";
|
||||
|
||||
Poco::File(from).renameTo(to);
|
||||
Poco::File(to).remove(true);
|
||||
}
|
||||
|
||||
void renameToOld() const
|
||||
{
|
||||
String from = storage.full_path + name + "/";
|
||||
String to = storage.full_path + "old_" + name + "/";
|
||||
|
||||
Poco::File f(from);
|
||||
f.setLastModified(Poco::Timestamp::fromEpochTime(time(0)));
|
||||
f.renameTo(to);
|
||||
}
|
||||
|
||||
bool operator< (const DataPart & rhs) const
|
||||
{
|
||||
if (left_month < rhs.left_month)
|
||||
return true;
|
||||
if (left_month > rhs.left_month)
|
||||
return false;
|
||||
if (right_month < rhs.right_month)
|
||||
return true;
|
||||
if (right_month > rhs.right_month)
|
||||
return false;
|
||||
|
||||
if (left < rhs.left)
|
||||
return true;
|
||||
if (left > rhs.left)
|
||||
return false;
|
||||
if (right < rhs.right)
|
||||
return true;
|
||||
if (right > rhs.right)
|
||||
return false;
|
||||
|
||||
if (level < rhs.level)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Содержит другой кусок (получен после объединения другого куска с каким-то ещё)
|
||||
bool contains(const DataPart & rhs) const
|
||||
{
|
||||
return left_month == rhs.left_month /// Куски за разные месяцы не объединяются
|
||||
&& right_month == rhs.right_month
|
||||
&& level > rhs.level
|
||||
&& left_date <= rhs.left_date
|
||||
&& right_date >= rhs.right_date
|
||||
&& left <= rhs.left
|
||||
&& right >= rhs.right;
|
||||
}
|
||||
|
||||
/// Загрузить индекс и вычислить размер.
|
||||
void loadIndex()
|
||||
{
|
||||
size_t key_size = storage.sort_descr.size();
|
||||
index.resize(key_size * size);
|
||||
|
||||
String index_path = storage.full_path + name + "/primary.idx";
|
||||
ReadBufferFromFile index_file(index_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
for (size_t j = 0; j < key_size; ++j)
|
||||
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i * key_size + j], index_file);
|
||||
|
||||
if (!index_file.eof())
|
||||
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
|
||||
|
||||
size_in_bytes = calcTotalSize(storage.full_path + name + "/");
|
||||
}
|
||||
};
|
||||
|
||||
typedef SharedPtr<DataPart> DataPartPtr;
|
||||
struct DataPartPtrLess { bool operator() (const DataPartPtr & lhs, const DataPartPtr & rhs) const { return *lhs < *rhs; } };
|
||||
typedef std::set<DataPartPtr, DataPartPtrLess> DataParts;
|
||||
|
||||
struct RangesInDataPart
|
||||
{
|
||||
DataPartPtr data_part;
|
||||
MarkRanges ranges;
|
||||
|
||||
RangesInDataPart() {}
|
||||
|
||||
RangesInDataPart(DataPartPtr data_part_)
|
||||
: data_part(data_part_)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
/// Пока существует, помечает части как currently_merging и пересчитывает общий объем сливаемых данных.
|
||||
/// Вероятно, что части будут помечены заранее.
|
||||
class CurrentlyMergingPartsTagger
|
||||
{
|
||||
public:
|
||||
std::vector<DataPartPtr> parts;
|
||||
Poco::FastMutex & data_mutex;
|
||||
|
||||
CurrentlyMergingPartsTagger(const std::vector<DataPartPtr> & parts_, Poco::FastMutex & data_mutex_) : parts(parts_), data_mutex(data_mutex_)
|
||||
{
|
||||
/// Здесь не лочится мьютекс, так как конструктор вызывается внутри selectPartsToMerge, где он уже залочен
|
||||
/// Poco::ScopedLock<Poco::FastMutex> lock(data_mutex);
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
parts[i]->currently_merging = true;
|
||||
StorageMergeTree::total_size_of_currently_merging_parts += parts[i]->size_in_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
~CurrentlyMergingPartsTagger()
|
||||
{
|
||||
Poco::ScopedLock<Poco::FastMutex> lock(data_mutex);
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
parts[i]->currently_merging = false;
|
||||
StorageMergeTree::total_size_of_currently_merging_parts -= parts[i]->size_in_bytes;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Сумарный размер currently_merging кусочков в байтах.
|
||||
/// Нужно чтобы оценить количество места на диске, которое может понадобится для завершения этих мерджей.
|
||||
static size_t total_size_of_currently_merging_parts;
|
||||
|
||||
typedef std::vector<RangesInDataPart> RangesInDataParts;
|
||||
|
||||
/** @warning Если берете насколько блокировок, то берите их везде в одинаковом порядке - в том же как они написаны в этом файле */
|
||||
/** Взятие этого лока на запись, запрещает мердж */
|
||||
Poco::RWLock merge_lock;
|
||||
|
||||
/** Взятие этого лока на запись, запрещает запись */
|
||||
Poco::RWLock write_lock;
|
||||
|
||||
/** Взятие этого лока на запись, запрещает чтение */
|
||||
Poco::RWLock read_lock;
|
||||
|
||||
/** Актуальное множество кусков с данными. */
|
||||
DataParts data_parts;
|
||||
Poco::FastMutex data_parts_mutex;
|
||||
|
||||
/** Множество всех кусков с данными, включая уже слитые в более крупные, но ещё не удалённые. Оно обычно небольшое (десятки элементов).
|
||||
* Ссылки на кусок есть отсюда, из списка актуальных кусков, и из каждого потока чтения, который его сейчас использует.
|
||||
* То есть, если количество ссылок равно 1 - то кусок не актуален и не используется прямо сейчас, и его можно удалить.
|
||||
*/
|
||||
DataParts all_data_parts;
|
||||
Poco::FastMutex all_data_parts_mutex;
|
||||
MergeTreeData data;
|
||||
|
||||
StorageMergeTree(const String & path_, const String & name_, NamesAndTypesListPtr columns_,
|
||||
const Context & context_,
|
||||
@ -447,74 +83,9 @@ private:
|
||||
const String & date_column_name_,
|
||||
const ASTPtr & sampling_expression_, /// NULL, если семплирование не поддерживается.
|
||||
size_t index_granularity_,
|
||||
Mode mode_ = Ordinary,
|
||||
MergeTreeData::Mode mode_ = MergeTreeData::Ordinary,
|
||||
const String & sign_column_ = "",
|
||||
const StorageMergeTreeSettings & settings_ = StorageMergeTreeSettings());
|
||||
|
||||
static String getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level);
|
||||
|
||||
BlockInputStreams spreadMarkRangesAmongThreads(
|
||||
RangesInDataParts parts,
|
||||
size_t threads,
|
||||
const Names & column_names,
|
||||
size_t max_block_size,
|
||||
bool use_uncompressed_cache,
|
||||
ExpressionActionsPtr prewhere_actions,
|
||||
const String & prewhere_column);
|
||||
|
||||
BlockInputStreams spreadMarkRangesAmongThreadsFinal(
|
||||
RangesInDataParts parts,
|
||||
size_t threads,
|
||||
const Names & column_names,
|
||||
size_t max_block_size,
|
||||
bool use_uncompressed_cache,
|
||||
ExpressionActionsPtr prewhere_actions,
|
||||
const String & prewhere_column);
|
||||
|
||||
/// Создать выражение "Sign == 1".
|
||||
void createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column);
|
||||
|
||||
/// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта.
|
||||
void loadDataParts();
|
||||
|
||||
/// Удалить неактуальные куски.
|
||||
void clearOldParts();
|
||||
|
||||
/** Определяет, какие куски нужно объединять, и запускает их слияние в отдельном потоке. Если iterations = 0, объединяет, пока это возможно.
|
||||
* Если aggressive - выбрать куски не обращая внимание на соотношение размеров и их новизну (для запроса OPTIMIZE).
|
||||
*/
|
||||
void merge(size_t iterations = 1, bool async = true, bool aggressive = false);
|
||||
|
||||
/// Если while_can, объединяет в цикле, пока можно; иначе выбирает и объединяет только одну пару кусков.
|
||||
void mergeThread(bool while_can, bool aggressive);
|
||||
|
||||
/// Сразу помечает их как currently_merging.
|
||||
/// Если merge_anything_for_old_months, для кусков за прошедшие месяцы снимается ограничение на соотношение размеров.
|
||||
bool selectPartsToMerge(Poco::SharedPtr<CurrentlyMergingPartsTagger> & what, bool merge_anything_for_old_months, bool aggressive);
|
||||
|
||||
void mergeParts(Poco::SharedPtr<CurrentlyMergingPartsTagger> & what);
|
||||
|
||||
/// Дождаться, пока фоновые потоки закончат слияния.
|
||||
void joinMergeThreads();
|
||||
|
||||
Poco::SharedPtr<boost::threadpool::pool> merge_threads;
|
||||
|
||||
void removeColumnFiles(String column_name);
|
||||
|
||||
/// Возвращает true если имя директории совпадает с форматом имени директории кусочков
|
||||
bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) const;
|
||||
|
||||
/// Кладет в DataPart данные из имени кусочка.
|
||||
void parsePartName(const String & file_name, const Poco::RegularExpression::MatchVec & matches, DataPart & part);
|
||||
|
||||
/// Определить, не битые ли данные в директории. Проверяет индекс и засечеки, но не сами данные.
|
||||
bool isBrokenPart(const String & path);
|
||||
|
||||
/// Найти самые большие old_куски, из которых получен этот кусок.
|
||||
/// Переименовать их, убрав префикс old_ и вернуть их имена.
|
||||
Strings tryRestorePart(const String & path, const String & file_name, Strings & old_parts);
|
||||
|
||||
void createConvertExpression(const String & in_column_name, const String & out_type, ExpressionActionsPtr & out_expression, String & out_column);
|
||||
const MergeTreeSettings & settings_ = MergeTreeSettings());
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,21 +1,16 @@
|
||||
#include <set>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include <sparsehash/dense_hash_set>
|
||||
#include <sparsehash/dense_hash_map>
|
||||
|
||||
#include <DB/Columns/ColumnNested.h>
|
||||
#include <sparsehash/dense_hash_set>
|
||||
#include <DB/Storages/IColumnsDeclaration.h>
|
||||
#include <DB/DataTypes/DataTypeNested.h>
|
||||
#include <DB/Storages/IStorage.h>
|
||||
#include <DB/Parsers/ASTIdentifier.h>
|
||||
#include <DB/Parsers/ASTNameTypePair.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
bool IStorage::hasRealColumn(const String &column_name) const
|
||||
bool IColumnsDeclaration::hasRealColumn(const String &column_name) const
|
||||
{
|
||||
const NamesAndTypesList & real_columns = getColumnsList();
|
||||
for (auto & it : real_columns)
|
||||
@ -25,7 +20,7 @@ bool IStorage::hasRealColumn(const String &column_name) const
|
||||
}
|
||||
|
||||
|
||||
NameAndTypePair IStorage::getRealColumn(const String &column_name) const
|
||||
NameAndTypePair IColumnsDeclaration::getRealColumn(const String &column_name) const
|
||||
{
|
||||
const NamesAndTypesList & real_columns = getColumnsList();
|
||||
for (auto & it : real_columns)
|
||||
@ -35,30 +30,30 @@ NameAndTypePair IStorage::getRealColumn(const String &column_name) const
|
||||
}
|
||||
|
||||
|
||||
bool IStorage::hasColumn(const String &column_name) const
|
||||
bool IColumnsDeclaration::hasColumn(const String &column_name) const
|
||||
{
|
||||
return hasRealColumn(column_name); /// По умолчанию считаем, что виртуальных столбцов в сторадже нет.
|
||||
}
|
||||
|
||||
|
||||
NameAndTypePair IStorage::getColumn(const String &column_name) const
|
||||
NameAndTypePair IColumnsDeclaration::getColumn(const String &column_name) const
|
||||
{
|
||||
return getRealColumn(column_name); /// По умолчанию считаем, что виртуальных столбцов в сторадже нет.
|
||||
}
|
||||
|
||||
|
||||
const DataTypePtr IStorage::getDataTypeByName(const String & column_name) const
|
||||
const DataTypePtr IColumnsDeclaration::getDataTypeByName(const String & column_name) const
|
||||
{
|
||||
const NamesAndTypesList & names_and_types = getColumnsList();
|
||||
for (NamesAndTypesList::const_iterator it = names_and_types.begin(); it != names_and_types.end(); ++it)
|
||||
if (it->first == column_name)
|
||||
return it->second;
|
||||
|
||||
|
||||
throw Exception("There is no column " + column_name + " in table " + getTableName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
||||
}
|
||||
|
||||
|
||||
Block IStorage::getSampleBlock() const
|
||||
Block IColumnsDeclaration::getSampleBlock() const
|
||||
{
|
||||
Block res;
|
||||
const NamesAndTypesList & names_and_types = getColumnsList();
|
||||
@ -71,7 +66,7 @@ Block IStorage::getSampleBlock() const
|
||||
col.column = col.type->createColumn();
|
||||
res.insert(col);
|
||||
}
|
||||
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -104,10 +99,10 @@ static NamesAndTypesMap getColumnsMap(const NamesAndTypesList & available_column
|
||||
}
|
||||
|
||||
|
||||
void IStorage::check(const Names & column_names) const
|
||||
void IColumnsDeclaration::check(const Names & column_names) const
|
||||
{
|
||||
const NamesAndTypesList & available_columns = getColumnsList();
|
||||
|
||||
|
||||
if (column_names.empty())
|
||||
throw Exception("Empty list of columns queried for table " + getTableName()
|
||||
+ ". There are columns: " + listOfColumns(available_columns),
|
||||
@ -134,14 +129,14 @@ void IStorage::check(const Names & column_names) const
|
||||
}
|
||||
|
||||
|
||||
void IStorage::check(const Block & block, bool need_all) const
|
||||
void IColumnsDeclaration::check(const Block & block, bool need_all) const
|
||||
{
|
||||
const NamesAndTypesList & available_columns = getColumnsList();
|
||||
const NamesAndTypesMap & columns_map = getColumnsMap(available_columns);
|
||||
|
||||
|
||||
typedef std::unordered_set<String> NameSet;
|
||||
NameSet names_in_block;
|
||||
|
||||
|
||||
for (size_t i = 0; i < block.columns(); ++i)
|
||||
{
|
||||
const ColumnWithNameAndType & column = block.getByPosition(i);
|
||||
@ -163,7 +158,7 @@ void IStorage::check(const Block & block, bool need_all) const
|
||||
+ ". Column has type " + it->second->getName() + ", got type " + column.type->getName(),
|
||||
ErrorCodes::TYPE_MISMATCH);
|
||||
}
|
||||
|
||||
|
||||
if (need_all && names_in_block.size() < columns_map.size())
|
||||
{
|
||||
for (NamesAndTypesList::const_iterator it = available_columns.begin(); it != available_columns.end(); ++it)
|
||||
@ -174,7 +169,6 @@ void IStorage::check(const Block & block, bool need_all) const
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки
|
||||
static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type)
|
||||
{
|
||||
@ -182,7 +176,7 @@ static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePai
|
||||
return (name_with_dot == name_type.first.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.first);
|
||||
}
|
||||
|
||||
void IStorage::alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context) const
|
||||
void IColumnsDeclaration::alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context)
|
||||
{
|
||||
if (params.type == ASTAlterQuery::ADD)
|
||||
{
|
||||
@ -219,14 +213,14 @@ void IStorage::alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTy
|
||||
else if (params.type == ASTAlterQuery::DROP)
|
||||
{
|
||||
String column_name = dynamic_cast<const ASTIdentifier &>(*(params.column)).name;
|
||||
|
||||
|
||||
/// Удаляем колонки из листа columns
|
||||
bool is_first = true;
|
||||
NamesAndTypesList::iterator column_it;
|
||||
do
|
||||
{
|
||||
column_it = std::find_if(columns->begin(), columns->end(), boost::bind(namesEqual, column_name, _1));
|
||||
|
||||
|
||||
if (column_it == columns->end())
|
||||
{
|
||||
if (is_first)
|
||||
@ -252,7 +246,7 @@ void IStorage::alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTy
|
||||
column_it->second = data_type;
|
||||
}
|
||||
else
|
||||
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
}
|
1468
dbms/src/Storages/MergeTree/MergeTreeData.cpp
Normal file
1468
dbms/src/Storages/MergeTree/MergeTreeData.cpp
Normal file
File diff suppressed because it is too large
Load Diff
@ -192,7 +192,7 @@ StoragePtr StorageFactory::get(
|
||||
|
||||
return StorageMergeTree::create(
|
||||
data_path, table_name, columns, context, primary_expr, date_column_name, sampling_expression, index_granularity,
|
||||
name == "SummingMergeTree" ? StorageMergeTree::Summing : StorageMergeTree::Ordinary);
|
||||
name == "SummingMergeTree" ? MergeTreeData::Summing : MergeTreeData::Ordinary);
|
||||
}
|
||||
else if (name == "CollapsingMergeTree")
|
||||
{
|
||||
@ -234,7 +234,7 @@ StoragePtr StorageFactory::get(
|
||||
|
||||
return StorageMergeTree::create(
|
||||
data_path, table_name, columns, context, primary_expr, date_column_name,
|
||||
sampling_expression, index_granularity, StorageMergeTree::Collapsing, sign_column_name);
|
||||
sampling_expression, index_granularity, MergeTreeData::Collapsing, sign_column_name);
|
||||
}
|
||||
else if (name == "SystemNumbers")
|
||||
{
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -37,7 +37,7 @@ DataParts copy(const DataParts &a)
|
||||
|
||||
const int RowsPerSec = 100000;
|
||||
|
||||
StorageMergeTreeSettings settings;
|
||||
MergeTreeSettings settings;
|
||||
|
||||
|
||||
size_t index_granularity = 1;
|
||||
|
Loading…
Reference in New Issue
Block a user