This commit is contained in:
Alexey Milovidov 2014-07-17 18:38:05 +04:00
commit be0890637c
27 changed files with 744 additions and 432 deletions

View File

@ -72,7 +72,7 @@ protected:
}
/// Функция для отладочного вывода информации
virtual void write()
void write()
{
std::cerr << "file " << file << std::endl;
std::cerr << "name " << name << std::endl;

View File

@ -2,11 +2,15 @@
#include <vector>
#include <string>
#include <unordered_set>
#include <unordered_map>
namespace DB
{
typedef std::vector<std::string> Names;
typedef std::unordered_set<std::string> NameSet;
typedef std::unordered_map<std::string, std::string> NameToNameMap;
}

View File

@ -3,10 +3,14 @@
#include <map>
#include <list>
#include <string>
#include <set>
#include <Poco/SharedPtr.h>
#include <DB/DataTypes/IDataType.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/IO/ReadBufferFromString.h>
#include "Names.h"
namespace DB
@ -33,8 +37,113 @@ struct NameAndTypePair
}
};
typedef std::list<NameAndTypePair> NamesAndTypesList;
typedef SharedPtr<NamesAndTypesList> NamesAndTypesListPtr;
typedef std::vector<NameAndTypePair> NamesAndTypes;
class NamesAndTypesList : public std::list<NameAndTypePair>
{
public:
using std::list<NameAndTypePair>::list;
void readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory)
{
DB::assertString("columns format version: 1\n", buf);
size_t count;
DB::readText(count, buf);
DB::assertString(" columns:\n", buf);
resize(count);
for (NameAndTypePair & it : *this)
{
DB::readBackQuotedString(it.name, buf);
DB::assertString(" ", buf);
String type_name;
DB::readString(type_name, buf);
it.type = data_type_factory.get(type_name);
DB::assertString("\n", buf);
}
}
void writeText(WriteBuffer & buf) const
{
DB::writeString("columns format version: 1\n", buf);
DB::writeText(size(), buf);
DB::writeString(" columns:\n", buf);
for (const auto & it : *this)
{
DB::writeBackQuotedString(it.name, buf);
DB::writeChar(' ', buf);
DB::writeString(it.type->getName(), buf);
DB::writeChar('\n', buf);
}
}
String toString() const
{
String s;
{
WriteBufferFromString out(s);
writeText(out);
}
return s;
}
static NamesAndTypesList parse(const String & s, const DataTypeFactory & data_type_factory)
{
ReadBufferFromString in(s);
NamesAndTypesList res;
res.readText(in, data_type_factory);
assertEOF(in);
return res;
}
/// Все элементы rhs должны быть различны.
bool isSubsetOf(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
return std::unique(vector.begin(), vector.end()) == vector.begin() + rhs.size();
}
/// Расстояние Хемминга между множествами
/// (иными словами, добавленные и удаленные столбцы считаются один раз; столбцы, изменившие тип, - дважды).
size_t sizeOfDifference(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
return (std::unique(vector.begin(), vector.end()) - vector.begin()) * 2 - size() - rhs.size();
}
Names getNames() const
{
Names res;
res.reserve(size());
for (const NameAndTypePair & column : *this)
{
res.push_back(column.name);
}
return res;
}
/// Оставить только столбцы, имена которых есть в names. В names могут быть лишние столбцы.
NamesAndTypesList filter(const NameSet & names) const
{
NamesAndTypesList res;
for (const NameAndTypePair & column : *this)
{
if (names.count(column.name))
res.push_back(column);
}
return res;
}
/// Оставить только столбцы, имена которых есть в names. В names могут быть лишние столбцы.
NamesAndTypesList filter(const Names & names) const
{
return filter(NameSet(names.begin(), names.end()));
}
};
typedef SharedPtr<NamesAndTypesList> NamesAndTypesListPtr;
}

View File

@ -2,8 +2,8 @@
#include <list>
#include <queue>
#include <Poco/Thread.h>
#include <atomic>
#include <thread>
#include <Yandex/logger_useful.h>
@ -33,21 +33,15 @@ using Poco::SharedPtr;
*/
class UnionBlockInputStream : public IProfilingBlockInputStream
{
class Thread;
public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
output_queue(max_threads), exhausted_inputs(0), finish(false),
pushed_end_of_output_queue(false), all_read(false), log(&Logger::get("UnionBlockInputStream"))
output_queue(max_threads)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
for (size_t i = 0; i < inputs_.size(); ++i)
{
input_queue.push(InputData());
input_queue.back().in = inputs_[i];
input_queue.back().i = i;
}
input_queue.emplace(inputs_[i], i);
}
String getName() const { return "UnionBlockInputStream"; }
@ -116,7 +110,7 @@ public:
protected:
void finalize()
{
if (threads_data.empty())
if (threads.empty())
return;
LOG_TRACE(log, "Waiting for threads to finish");
@ -124,14 +118,11 @@ protected:
/// Вынем всё, что есть в очереди готовых данных.
output_queue.clear();
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
* PS. Может быть, для переменной finish нужен барьер?
*/
/// В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
for (auto & thread : threads)
thread.join();
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
it->thread->join();
threads_data.clear();
threads.clear();
LOG_TRACE(log, "Waited for threads to finish");
}
@ -143,15 +134,11 @@ protected:
return res.block;
/// Запускаем потоки, если это ещё не было сделано.
if (threads_data.empty())
if (threads.empty())
{
threads_data.resize(max_threads);
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
{
it->runnable = new Thread(*this, current_memory_tracker);
it->thread = new Poco::Thread;
it->thread->start(*it->runnable);
}
threads.reserve(max_threads);
for (size_t i = 0; i < max_threads; ++i)
threads.emplace_back([=] { thread(current_memory_tracker); });
}
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
@ -173,8 +160,9 @@ protected:
/// Может быть, в очереди есть ещё эксепшен.
OutputData res;
while (output_queue.tryPop(res) && res.exception)
res.exception->rethrow();
while (output_queue.tryPop(res))
if (res.exception)
res.exception->rethrow();
finalize();
@ -188,132 +176,108 @@ private:
{
BlockInputStreamPtr in;
size_t i; /// Порядковый номер источника (для отладки).
InputData() {}
InputData(BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
};
/// Данные отдельного потока
struct ThreadData
void thread(MemoryTracker * memory_tracker)
{
SharedPtr<Poco::Thread> thread;
SharedPtr<Thread> runnable;
};
current_memory_tracker = memory_tracker;
ExceptionPtr exception;
class Thread : public Poco::Runnable
{
public:
Thread(UnionBlockInputStream & parent_, MemoryTracker * memory_tracker_)
: parent(parent_), memory_tracker(memory_tracker_)
try
{
loop();
}
catch (...)
{
exception = cloneCurrentException();
}
void run()
if (exception)
{
current_memory_tracker = memory_tracker;
ExceptionPtr exception;
/// Отдаём эксепшен в основной поток.
output_queue.push(exception);
try
{
loop();
cancel();
}
catch (...)
{
exception = cloneCurrentException();
}
if (exception)
{
try
{
parent.cancel();
}
catch (...)
{
/** Если не удалось попросить остановиться одного или несколько источников.
* (например, разорвано соединение при распределённой обработке запроса)
* - то пофиг.
*/
}
/// Отдаём эксепшен в основной поток.
parent.output_queue.push(exception);
/** Если не удалось попросить остановиться одного или несколько источников.
* (например, разорвано соединение при распределённой обработке запроса)
* - то пофиг.
*/
}
}
}
void loop()
void loop()
{
while (!finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
{
while (!parent.finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
InputData input;
/// Выбираем следующий источник.
{
InputData input;
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
/// Выбираем следующий источник.
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
if (input_queue.empty())
break;
input = input_queue.front();
/// Убираем источник из очереди доступных источников.
input_queue.pop();
}
/// Основная работа.
Block block = input.in->read();
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
if (block)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
input_queue.push(input);
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
if (parent.input_queue.empty())
if (finish)
break;
input = parent.input_queue.front();
/// Убираем источник из очереди доступных источников.
parent.input_queue.pop();
output_queue.push(block);
}
/// Основная работа.
Block block = input.in->read();
else
{
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
++exhausted_inputs;
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
if (block)
/// Если все источники иссякли.
if (exhausted_inputs == children.size())
{
parent.input_queue.push(input);
if (parent.finish)
break;
parent.output_queue.push(block);
finish = true;
break;
}
else
{
++parent.exhausted_inputs;
/// Если все источники иссякли.
if (parent.exhausted_inputs == parent.children.size())
{
parent.finish = true;
break;
}
}
}
}
if (parent.finish)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет.
if (!parent.pushed_end_of_output_queue)
{
parent.pushed_end_of_output_queue = true;
parent.output_queue.push(OutputData());
}
}
}
private:
UnionBlockInputStream & parent;
MemoryTracker * memory_tracker;
};
if (finish)
{
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет; только один раз.
if (false == pushed_end_of_output_queue.exchange(true))
output_queue.push(OutputData());
}
}
unsigned max_threads;
/// Потоки.
typedef std::list<ThreadData> ThreadsData;
ThreadsData threads_data;
typedef std::vector<std::thread> ThreadsData;
ThreadsData threads;
/// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент.
typedef std::queue<InputData> InputQueue;
@ -334,19 +298,20 @@ private:
typedef ConcurrentBoundedQueue<OutputData> OutputQueue;
OutputQueue output_queue;
/// Для операций с очередями.
/// Для операций с input_queue.
Poco::FastMutex mutex;
/// Сколько источников иссякло.
size_t exhausted_inputs;
size_t exhausted_inputs = 0;
/// Завершить работу потоков (раньше, чем иссякнут источники).
volatile bool finish;
std::atomic<bool> finish { false };
/// Положили ли в output_queue пустой блок.
volatile bool pushed_end_of_output_queue;
bool all_read;
std::atomic<bool> pushed_end_of_output_queue { false };
Logger * log;
bool all_read { false };
Logger * log = &Logger::get("UnionBlockInputStream");
};
}

View File

@ -18,8 +18,6 @@ typedef Poco::SharedPtr<IFunction> FunctionPtr;
typedef std::pair<std::string, std::string> NameWithAlias;
typedef std::vector<NameWithAlias> NamesWithAliases;
typedef std::unordered_set<String> NameSet;
typedef std::unordered_map<String, String> NameToNameMap;
class Join;

View File

@ -30,13 +30,14 @@ public:
int type;
/** В запросе ADD COLUMN здесь хранится имя и тип добавляемого столбца
* В запросе DROP это поле не используется
* В запросе MODIFY здесь хранится имя столбца и новый тип
*/
* В запросе DROP это поле не используется
* В запросе MODIFY здесь хранится имя столбца и новый тип
*/
ASTPtr name_type;
/** В запросе ADD COLUMN здесь опционально хранится имя столбца, следующее после AFTER
* В запросе DROP здесь хранится имя столбца для удаления
*/
* В запросе DROP здесь хранится имя столбца для удаления
*/
ASTPtr column;
/// deep copy

View File

@ -55,6 +55,14 @@ public:
*/
void check(const Names & column_names) const;
/** Проверить, что все запрошенные имена есть в таблице и имеют правильные типы.
*/
void check(const NamesAndTypesList & columns) const;
/** Проверить, что все имена из пересечения names и columns есть в таблице и имеют одинаковые типы.
*/
void check(const NamesAndTypesList & columns, const Names & column_names) const;
/** Проверить, что блок с данными для записи содержит все столбцы таблицы с правильными типами,
* содержит только столбцы таблицы, и все столбцы различны.
* Если need_all, еще проверяет, что все столбцы таблицы есть в блоке.

View File

@ -21,11 +21,16 @@ public:
:
path(path_), block_size(block_size_), column_names(column_names_),
storage(storage_), owned_data_part(owned_data_part_),
part_columns_lock(new Poco::ScopedReadRWLock(owned_data_part->columns_lock)),
all_mark_ranges(mark_ranges_), remaining_mark_ranges(mark_ranges_),
use_uncompressed_cache(use_uncompressed_cache_),
prewhere_actions(prewhere_actions_), prewhere_column(prewhere_column_),
log(&Logger::get("MergeTreeBlockInputStream"))
{
/// Под owned_data_part->columns_lock проверим, что все запрошенные столбцы в куске того же типа, что в таблице.
/// Это может быть не так во время ALTER MODIFY.
storage.check(owned_data_part->columns, column_names);
std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
if (prewhere_actions)
@ -246,6 +251,9 @@ protected:
* буферы не висели в памяти.
*/
reader.reset();
pre_reader.reset();
part_columns_lock.reset();
owned_data_part.reset();
}
return res;
@ -258,7 +266,8 @@ private:
NameSet column_name_set;
Names pre_column_names;
MergeTreeData & storage;
const MergeTreeData::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
MergeTreeData::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
std::unique_ptr<Poco::ScopedReadRWLock> part_columns_lock; /// Не дадим изменить список столбцов куска, пока мы из него читаем.
MarkRanges all_mark_ranges; /// В каких диапазонах засечек читать. В порядке возрастания номеров.
MarkRanges remaining_mark_ranges; /// В каких диапазонах засечек еще не прочли.
/// В порядке убывания номеров, чтобы можно было выбрасывать из конца.

View File

@ -40,6 +40,7 @@ namespace DB
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
* Внутри директории с куском:
* checksums.txt - список файлов с их размерами и контрольными суммами.
* columns.txt - список столбцов с их типами.
* primary.idx - индексный файл.
* Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
@ -105,8 +106,12 @@ struct MergeTreeSettings
/// Таким образом, скорость вставок автоматически замедлится примерно до скорости слияний.
double insert_delay_step = 1.1;
/// Для скольки блоков, вставленных с непустым insert ID, хранить хеши в ZooKeeper.
/// Для скольки последних блоков хранить хеши в ZooKeeper.
size_t replicated_deduplication_window = 10000;
/// Хранить примерно столько последних записей в логе в ZooKeeper, даже если они никому уже не нужны.
/// Не влияет на работу таблиц; используется только чтобы успеть посмотреть на лог в ZooKeeper глазами прежде, чем его очистят.
size_t replicated_logs_to_keep = 100;
};
class MergeTreeData : public ITableDeclaration
@ -210,7 +215,8 @@ public:
MergeTreeData & storage;
size_t size; /// в количестве засечек.
size_t size_in_bytes; /// размер в байтах, 0 - если не посчитано
std::atomic<size_t> size_in_bytes; /// размер в байтах, 0 - если не посчитано;
/// atomic, чтобы можно было не заботиться о блокировках при ALTER.
time_t modification_time;
mutable time_t remove_time; /// Когда кусок убрали из рабочего набора.
@ -220,6 +226,24 @@ public:
Checksums checksums;
/// Описание столбцов.
NamesAndTypesList columns;
/** Блокируется на запись при изменении columns, checksums или любых файлов куска.
* Блокируется на чтение при чтении columns, checksums или любых файлов куска.
*/
mutable Poco::RWLock columns_lock;
/** Берется на все время ALTER куска: от начала записи временных фалов до их переименования в постоянные.
* Берется при разлоченном columns_lock.
*
* NOTE: "Можно" было бы обойтись без этого мьютекса, если бы можно было превращать ReadRWLock в WriteRWLock, не снимая блокировку.
* Такое превращение невозможно, потому что создало бы дедлок, если делать его из двух потоков сразу.
* Взятие этого мьютекса означает, что мы хотим заблокировать columns_lock на чтение с намерением потом, не
* снимая блокировку, заблокировать его на запись.
*/
mutable Poco::FastMutex alter_mutex;
/// NOTE можно загружать засечки тоже в оперативку
/// Вычисляем сумарный размер всей директории со всеми файлами
@ -261,7 +285,7 @@ public:
{
/// Размер - в количестве засечек.
if (!size)
size = Poco::File(storage.full_path + name + "/" + escapeForFileName(storage.columns->front().name) + ".mrk")
size = Poco::File(storage.full_path + name + "/" + escapeForFileName(columns.front().name) + ".mrk")
.getSize() / MERGE_TREE_MARK_SIZE;
size_t key_size = storage.sort_descr.size();
@ -282,15 +306,34 @@ public:
}
/// Прочитать контрольные суммы, если есть.
bool loadChecksums()
void loadChecksums()
{
String path = storage.full_path + name + "/checksums.txt";
if (!Poco::File(path).exists())
return false;
{
if (storage.require_part_metadata)
throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
return;
}
ReadBufferFromFile file(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
if (checksums.readText(file))
assertEOF(file);
return true;
}
void loadColumns()
{
String path = storage.full_path + name + "/columns.txt";
if (!Poco::File(path).exists())
{
if (storage.require_part_metadata)
throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
columns = *storage.columns;
return;
}
ReadBufferFromFile file(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
columns.readText(file, storage.context.getDataTypeFactory());
}
void checkNotBroken()
@ -299,6 +342,20 @@ public:
if (!checksums.empty())
{
if (!checksums.files.count("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (storage.require_part_metadata)
{
for (const NameAndTypePair & it : columns)
{
String name = escapeForFileName(it.name);
if (!checksums.files.count(name + ".mrk") ||
!checksums.files.count(name + ".bin"))
throw Exception("No .mrk or .bin file checksum for column " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
}
}
checksums.checkSizes(path + "/");
}
else
@ -313,9 +370,9 @@ public:
/// Проверяем, что все засечки непусты и имеют одинаковый размер.
ssize_t marks_size = -1;
for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
for (const NameAndTypePair & it : columns)
{
Poco::File marks_file(path + "/" + escapeForFileName(it->name) + ".mrk");
Poco::File marks_file(path + "/" + escapeForFileName(it.name) + ".mrk");
/// При добавлении нового столбца в таблицу файлы .mrk не создаются. Не будем ничего удалять.
if (!marks_file.exists())
@ -405,9 +462,19 @@ public:
private:
friend class MergeTreeData;
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_) {}
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_), alter_lock(data_part->alter_mutex) {}
void clear()
{
alter_lock.unlock();
data_part = nullptr;
}
DataPartPtr data_part;
Poco::ScopedLockWithUnlock<Poco::FastMutex> alter_lock;
DataPart::Checksums new_checksums;
NamesAndTypesList new_columns;
/// Если значение - пустая строка, файл нужно удалить, и он не временный.
NameToNameMap rename_map;
};
@ -430,6 +497,7 @@ public:
* primary_expr_ast - выражение для сортировки;
* date_column_name - имя столбца с датой;
* index_granularity - на сколько строчек пишется одно значение индекса.
* require_part_metadata - обязательно ли в директории с куском должны быть checksums.txt и columns.txt
*/
MergeTreeData( const String & full_path_, NamesAndTypesListPtr columns_,
const Context & context_,
@ -440,7 +508,9 @@ public:
Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_,
const String & log_name_);
const String & log_name_,
bool require_part_metadata_
);
std::string getModePrefix() const;
@ -451,7 +521,7 @@ public:
UInt64 getMaxDataPartIndex();
std::string getTableName() const {
return "abc";//throw Exception("Logical error: calling method getTableName of not a table.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: calling method getTableName of not a table.", ErrorCodes::LOGICAL_ERROR);
}
const NamesAndTypesList & getColumnsList() const { return *columns; }
@ -554,6 +624,8 @@ public:
const ASTPtr primary_expr_ast;
private:
bool require_part_metadata;
ExpressionActionsPtr primary_expr;
SortDescription sort_descr;
Block primary_key_sample;

View File

@ -211,7 +211,6 @@ public:
index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = new HashingWriteBuffer(*index_file_stream);
columns_list = storage.getColumnsList();
for (const auto & it : columns_list)
addStream(part_path, it.name, *it.type);
}
@ -283,11 +282,18 @@ public:
/// Кусок пустой - все записи удалились.
Poco::File(part_path).remove(true);
checksums.files.clear();
return checksums;
}
else
{
/// Записываем файл с описанием столбцов.
WriteBufferFromFile out(part_path + "columns.txt", 4096);
columns_list.writeText(out);
}
{
/// Записываем файл с чексуммами.
WriteBufferFromFile out(part_path + "checksums.txt", 1024);
WriteBufferFromFile out(part_path + "checksums.txt", 4096);
checksums.writeText(out);
}

View File

@ -62,6 +62,11 @@ public:
"",
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/columns",
part->columns.toString(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(),
@ -75,7 +80,7 @@ public:
}
storage.checkPartAndAddToZooKeeper(part, ops);
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/log/log-",
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential));

View File

@ -25,9 +25,13 @@ public:
MergeTreeData::DataPartPtr part = findPart(part_name);
Poco::ScopedReadRWLock part_lock(part->columns_lock);
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Добавим файлы, которых нет в списке контрольных сумм.
checksums.files["checksums.txt"];
checksums.files["columns.txt"];
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
@ -107,7 +111,8 @@ public:
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path);
if (file_name != "checksums.txt")
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
checksums.addFile(file_name, file_size, expected_hash);
}
@ -117,8 +122,9 @@ public:
ActiveDataPartSet::parsePartName(part_name, *new_data_part);
new_data_part->name = "tmp_" + part_name;
new_data_part->modification_time = time(0);
new_data_part->loadIndex();
new_data_part->loadColumns();
new_data_part->loadChecksums();
new_data_part->loadIndex();
new_data_part->checksums.checkEqual(checksums, false);

View File

@ -177,6 +177,7 @@ private:
*/
StringSet future_parts;
String database_name;
String table_name;
String full_path;
@ -188,6 +189,11 @@ private:
*/
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
/** Версия ноды /columns в ZooKeeper, соответствующая текущим data.columns.
* Читать и изменять вместе с data.columns - под TableStructureLock.
*/
int columns_version = -1;
/** Случайные данные, которые мы записали в /replicas/me/is_active.
*/
String active_node_identifier;
@ -212,6 +218,7 @@ private:
/// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь.
std::thread queue_updating_thread;
zkutil::EventPtr queue_updating_event = zkutil::EventPtr(new Poco::Event);
/// Задание, выполняющее действия из очереди.
BackgroundProcessingPool::TaskHandle queue_task_handle;
@ -220,12 +227,16 @@ private:
std::thread merge_selecting_thread;
Poco::Event merge_selecting_event;
/// Поток, удаляющий информацию о старых блоках из ZooKeeper.
std::thread clear_old_blocks_thread;
/// Поток, удаляющий старые куски, записи в логе и блоки.
std::thread cleanup_thread;
/// Поток, обрабатывающий переподключение к ZooKeeper при истечении сессии (очень маловероятное событие).
std::thread restarting_thread;
/// Поток, следящий за изменениями списка столбцов в ZooKeeper и обновляющего куски в соответствии с этими изменениями.
std::thread alter_thread;
zkutil::EventPtr alter_thread_event = zkutil::EventPtr(new Poco::Event);
/// Когда последний раз выбрасывали старые логи из ZooKeeper.
time_t clear_old_logs_time = 0;
@ -270,14 +281,14 @@ private:
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
*/
void checkTableStructure();
void checkTableStructure(bool skip_sanity_checks);
/** Проверить, что множество кусков соответствует тому, что в ZK (/replicas/me/parts/).
* Если каких-то кусков, описанных в ZK нет локально, бросить исключение.
* Если какие-то локальные куски не упоминаются в ZK, удалить их.
* Но если таких слишком много, на всякий случай бросить исключение - скорее всего, это ошибка конфигурации.
*/
void checkParts();
void checkParts(bool skip_sanity_checks);
/// Положить все куски из data в virtual_parts.
void initVirtualParts();
@ -294,6 +305,7 @@ private:
* Если ни у кого нет такого куска, ничего не проверяет.
* Не очень надежно: если две реплики добавляют кусок почти одновременно, ни одной проверки не произойдет.
* Кладет в ops действия, добавляющие данные о куске в ZooKeeper.
* Вызывать под TableStructureLock.
*/
void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops);
@ -312,8 +324,9 @@ private:
void loadQueue();
/** Копирует новые записи из логов всех реплик в очередь этой реплики.
* Если next_update_event != nullptr, вызовет это событие, когда в логе появятся новые записи.
*/
void pullLogsToQueue();
void pullLogsToQueue(zkutil::EventPtr next_update_event = nullptr);
/** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое.
* Вызывается под queue_mutex.
@ -342,7 +355,7 @@ private:
/** В бесконечном цикле вызывает clearOldBlocks.
*/
void clearOldBlocksThread();
void cleanupThread();
/** В бесконечном цикле проверяет, не протухла ли сессия в ZooKeeper.
*/
@ -360,7 +373,6 @@ private:
/** Скачать указанный кусок с указанной реплики.
*/
void fetchPart(const String & part_name, const String & replica_name);
};
}

View File

@ -151,8 +151,6 @@ void ExpressionAction::prepare(Block & sample_block)
{
for (const auto & col : columns_added_by_join)
sample_block.insert(ColumnWithNameAndType(col.type->createColumn(), col.type, col.name));
std::cerr << sample_block.dumpNames() << std::endl;
}
else if (type == ADD_COLUMN)
{
@ -264,8 +262,6 @@ void ExpressionAction::execute(Block & block) const
{
Block new_block;
//std::cerr << block.dumpNames() << std::endl;
for (size_t i = 0; i < projection.size(); ++i)
{
const std::string & name = projection[i].first;

View File

@ -1245,8 +1245,8 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
Names join_key_names_right(join_key_names_right_set.begin(), join_key_names_right_set.end());
JoinPtr join = new Join(join_key_names_left, join_key_names_right, settings.limits, ast_join.kind, ast_join.strictness);
for (const auto & name_type : columns_added_by_join)
std::cerr << "! Column added by JOIN: " << name_type.name << std::endl;
/* for (const auto & name_type : columns_added_by_join)
std::cerr << "! Column added by JOIN: " << name_type.name << std::endl;*/
Names required_joined_columns(join_key_names_right.begin(), join_key_names_right.end());
for (const auto & name_type : columns_added_by_join)
@ -1596,8 +1596,6 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
if (!select_query || !select_query->join)
return;
std::cerr << "collectJoinedColumns" << std::endl;
auto & node = typeid_cast<ASTJoin &>(*select_query->join);
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
auto & table = node.table->children.at(0); /// TODO: поддержка идентификаторов.
@ -1626,14 +1624,14 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
}
}
for (const auto & name : join_key_names_left_set)
/* for (const auto & name : join_key_names_left_set)
std::cerr << "JOIN key (left): " << name << std::endl;
for (const auto & name : join_key_names_right_set)
std::cerr << "JOIN key (right): " << name << std::endl;
std::cerr << std::endl;
for (const auto & name : joined_columns)
std::cerr << "JOINed column: " << name << std::endl;
std::cerr << std::endl;
std::cerr << std::endl;*/
}
Names ExpressionAnalyzer::getRequiredColumns()

View File

@ -87,7 +87,7 @@ AlterCommands InterpreterAlterQuery::parseAlter(
}
void InterpreterAlterQuery::updateMetadata(
const String& database_name, const String& table_name, const NamesAndTypesList& columns, Context& context)
const String & database_name, const String & table_name, const NamesAndTypesList & columns, Context & context)
{
String path = context.getPath();

View File

@ -737,6 +737,13 @@ void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent, bo
s << (hilite ? hilite_keyword : "") << indent_str << "DROP COLUMN " << (hilite ? hilite_none : "");
formatAST(*p.column, s, indent, hilite, true);
}
else if (p.type == ASTAlterQuery::MODIFY)
{
s << (hilite ? hilite_keyword : "") << indent_str << "MODIFY COLUMN " << (hilite ? hilite_none : "");
formatAST(*p.name_type, s, indent, hilite, true);
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
std::string comma = (i < (ast.parameters.size() -1) ) ? "," : "";
s << (hilite ? hilite_keyword : "") << indent_str << comma << (hilite ? hilite_none : "");

View File

@ -171,10 +171,20 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
}
void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerResponse & response)
void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
try
{
/** Если POST и Keep-Alive, прочитаем тело до конца.
* Иначе вместо следующего запроса, будет прочитан кусок этого тела.
*/
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST
&& response.getKeepAlive()
&& !request.stream().eof())
{
request.stream().ignore(std::numeric_limits<std::streamsize>::max());
}
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << s.str() << std::endl;
@ -214,26 +224,26 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
s << "Code: " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
LOG_ERROR(log, s.str());
trySendExceptionToClient(s, response);
trySendExceptionToClient(s, request, response);
}
catch (Poco::Exception & e)
{
std::stringstream s;
s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
trySendExceptionToClient(s, response);
trySendExceptionToClient(s, request, response);
}
catch (std::exception & e)
{
std::stringstream s;
s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what();
trySendExceptionToClient(s, response);
trySendExceptionToClient(s, request, response);
}
catch (...)
{
std::stringstream s;
s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception.";
trySendExceptionToClient(s, response);
trySendExceptionToClient(s, request, response);
}
}

View File

@ -17,7 +17,7 @@ public:
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
void trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerResponse & response);
void trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
private:
Server & server;

View File

@ -35,7 +35,15 @@ public:
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
response.send() << "Ok." << std::endl;
try
{
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingRequestHandler");
}
}
};

View File

@ -138,6 +138,71 @@ void ITableDeclaration::check(const Names & column_names) const
}
void ITableDeclaration::check(const NamesAndTypesList & columns) const
{
const NamesAndTypesList & available_columns = getColumnsList();
const NamesAndTypesMap & columns_map = getColumnsMap(available_columns);
typedef google::dense_hash_set<StringRef, StringRefHash> UniqueStrings;
UniqueStrings unique_names;
unique_names.set_empty_key(StringRef());
for (const NameAndTypePair & column : columns)
{
NamesAndTypesMap::const_iterator it = columns_map.find(column.name);
if (columns_map.end() == it)
throw Exception("There is no column with name " + column.name + ". There are columns: "
+ listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (column.type->getName() != it->second->getName())
throw Exception("Type mismatch for column " + column.name + ". Column has type "
+ it->second->getName() + ", got type " + column.type->getName(), ErrorCodes::TYPE_MISMATCH);
if (unique_names.end() != unique_names.find(column.name))
throw Exception("Column " + column.name + " queried more than once",
ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(column.name);
}
}
void ITableDeclaration::check(const NamesAndTypesList & columns, const Names & column_names) const
{
const NamesAndTypesList & available_columns = getColumnsList();
const NamesAndTypesMap & available_columns_map = getColumnsMap(available_columns);
const NamesAndTypesMap & provided_columns_map = getColumnsMap(columns);
if (column_names.empty())
throw Exception("Empty list of columns queried. There are columns: " + listOfColumns(available_columns),
ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
typedef google::dense_hash_set<StringRef, StringRefHash> UniqueStrings;
UniqueStrings unique_names;
unique_names.set_empty_key(StringRef());
for (const String & name : column_names)
{
NamesAndTypesMap::const_iterator it = provided_columns_map.find(name);
if (provided_columns_map.end() == it)
continue;
NamesAndTypesMap::const_iterator jt = available_columns_map.find(name);
if (available_columns_map.end() == jt)
throw Exception("There is no column with name " + name + ". There are columns: "
+ listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (it->second->getName() != jt->second->getName())
throw Exception("Type mismatch for column " + name + ". Column has type "
+ jt->second->getName() + ", got type " + it->second->getName(), ErrorCodes::TYPE_MISMATCH);
if (unique_names.end() != unique_names.find(name))
throw Exception("Column " + name + " queried more than once",
ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(name);
}
}
void ITableDeclaration::check(const Block & block, bool need_all) const
{
const NamesAndTypesList & available_columns = getColumnsList();

View File

@ -9,6 +9,7 @@
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <algorithm>
@ -25,12 +26,14 @@ MergeTreeData::MergeTreeData(
Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_,
const String & log_name_)
const String & log_name_,
bool require_part_metadata_)
: context(context_),
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
mode(mode_), sign_column(sign_column_),
settings(settings_), primary_expr_ast(primary_expr_ast_->clone()),
require_part_metadata(require_part_metadata_),
full_path(full_path_), columns(columns_), log_name(log_name_),
log(&Logger::get(log_name + " (Data)"))
{
@ -128,8 +131,9 @@ void MergeTreeData::loadDataParts()
try
{
part->loadIndex();
part->loadColumns();
part->loadChecksums();
part->loadIndex();
part->checkNotBroken();
}
catch (...)
@ -419,11 +423,11 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(DataPart
{
ExpressionActionsPtr expression;
AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part));
createConvertExpression(part, *columns, new_columns, expression, transaction->rename_map); // TODO: part->columns
createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map);
if (transaction->rename_map.empty())
{
transaction->data_part = nullptr;
transaction->clear();
return transaction;
}
@ -464,11 +468,20 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(DataPart
/// Запишем обновленные контрольные суммы во временный файл
if (!part->checksums.empty())
{
transaction->new_checksums = new_checksums;
WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096);
new_checksums.writeText(checksums_file);
transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
}
/// Запишем обновленный список столбцов во временный файл.
{
transaction->new_columns = new_columns.filter(part->columns.getNames());
WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096);
transaction->new_columns.writeText(columns_file);
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
}
return transaction;
}
@ -478,25 +491,45 @@ void MergeTreeData::AlterDataPartTransaction::commit()
return;
try
{
Poco::ScopedWriteRWLock lock(data_part->columns_lock);
String path = data_part->storage.full_path + data_part->name + "/";
/// 1) Переименуем старые файлы.
for (auto it : rename_map)
{
if (it.second.empty())
{
Poco::File(path + it.first).renameTo(path + it.first + ".removing");
Poco::File(path + it.first + ".removing").remove();
}
else
String name = it.second.empty() ? it.first : it.second;
Poco::File(path + name).renameTo(path + name + ".tmp2");
}
/// 2) Переместим на их место новые и обновим метаданные в оперативке.
for (auto it : rename_map)
{
if (!it.second.empty())
{
Poco::File(path + it.first).renameTo(path + it.second);
}
}
data_part = nullptr;
DataPart & mutable_part = const_cast<DataPart &>(*data_part);
mutable_part.checksums = new_checksums;
mutable_part.columns = new_columns;
/// 3) Удалим старые файлы.
for (auto it : rename_map)
{
String name = it.second.empty() ? it.first : it.second;
Poco::File(path + name + ".tmp2").remove();
}
mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path);
clear();
}
catch (...)
{
/// Если что-то пошло не так, не будем удалять временные файлы в деструкторе.
data_part = nullptr;
clear();
throw;
}
}

View File

@ -253,10 +253,17 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
{
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name);
Names all_column_names;
NameSet union_columns_set;
for (const MergeTreeData::DataPartPtr & part : parts)
{
Poco::ScopedReadRWLock part_lock(part->columns_lock);
Names part_columns = part->columns.getNames();
union_columns_set.insert(part_columns.begin(), part_columns.end());
}
NamesAndTypesList columns_list = data.getColumnsList();
for (const auto & it : columns_list)
all_column_names.push_back(it.name);
NamesAndTypesList union_columns = columns_list.filter(union_columns_set);
Names union_column_names = union_columns.getNames();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
ActiveDataPartSet::parsePartName(merged_name, *new_data_part);
@ -271,7 +278,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, all_column_names, data,
data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
parts[i], ranges, false, nullptr, ""), data.getPrimaryExpression()));
}
@ -304,7 +311,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(data, new_part_tmp_path, data.getColumnsList());
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(data, new_part_tmp_path, union_columns);
merged_stream->readPrefix();
to->writePrefix();
@ -317,8 +324,8 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
throw Exception("Canceled merging parts", ErrorCodes::ABORTED);
merged_stream->readSuffix();
new_data_part->columns = union_columns;
new_data_part->checksums = to->writeSuffixAndGetChecksums();
new_data_part->index.swap(to->getIndex());
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.

View File

@ -97,7 +97,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
/// Сортируем.
stableSortBlock(block, sort_descr);
MergedBlockOutputStream out(data, part_tmp_path, block.getColumnsList());
NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames());
MergedBlockOutputStream out(data, part_tmp_path, columns);
out.getIndex().reserve(part_size * sort_descr.size());
out.writePrefix();
@ -115,8 +116,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
new_data_part->modification_time = time(0);
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date);
new_data_part->index.swap(out.getIndex());
new_data_part->columns = columns;
new_data_part->checksums = checksums;
new_data_part->index.swap(out.getIndex());
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(part_tmp_path);
return new_data_part;

View File

@ -20,7 +20,7 @@ StorageMergeTree::StorageMergeTree(const String & path_, const String & database
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), increment(full_path + "increment.txt"),
background_pool(context_.getBackgroundPool()),
data(full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_,mode_, sign_column_, settings_, database_name_ + "." + name),
index_granularity_,mode_, sign_column_, settings_, database_name_ + "." + name, false),
reader(data), writer(data), merger(data),
log(&Logger::get(database_name_ + "." + name + " (StorageMergeTree)")),
shutdown_called(false)
@ -135,7 +135,7 @@ void StorageMergeTree::alter(const AlterCommands & params, const String & databa
bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context * pool_context)
{
auto structure_lock = lockStructure(false);
auto structure_lock = lockStructure(true);
/// Удаляем старые куски.
data.clearOldParts();

View File

@ -4,17 +4,16 @@
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <time.h>
namespace DB
{
const auto QUEUE_UPDATE_SLEEP_MS = 5 * 1000;
const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5);
const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1);
const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0);
const auto ERROR_SLEEP_MS = 1000;
const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
const auto CLEANUP_SLEEP_MS = 30 * 1000;
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
@ -32,11 +31,11 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & sign_column_,
const MergeTreeSettings & settings_)
:
context(context_), zookeeper(context.getZooKeeper()),
context(context_), zookeeper(context.getZooKeeper()), database_name(database_name_),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
replica_name(replica_name_),
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name),
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true),
reader(data), writer(data), merger(data), fetcher(data),
log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")),
shutdown_event(false), permanent_shutdown_event(false)
@ -56,13 +55,23 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
if (!zookeeper->exists(zookeeper_path))
createTable();
checkTableStructure();
checkTableStructure(false);
createReplica();
}
else
{
checkTableStructure();
checkParts();
bool skip_sanity_checks = false;
if (zookeeper->exists(replica_path + "/flags/force_restore_data"))
{
skip_sanity_checks = true;
zookeeper->remove(replica_path + "/flags/force_restore_data");
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag "
<< replica_path << "/flags/force_restore_data).");
}
checkTableStructure(skip_sanity_checks);
checkParts(skip_sanity_checks);
}
initVirtualParts();
@ -74,7 +83,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
LOG_INFO(log, "Have unreplicated data");
unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_, context_, primary_expr_ast_,
date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_,
database_name_ + "." + table_name + "[unreplicated]"));
database_name_ + "." + table_name + "[unreplicated]", false));
unreplicated_reader.reset(new MergeTreeDataSelectExecutor(*unreplicated_data));
unreplicated_merger.reset(new MergeTreeDataMerger(*unreplicated_data));
}
@ -131,7 +140,7 @@ void StorageReplicatedMergeTree::createTable()
zookeeper->create(zookeeper_path, "", zkutil::CreateMode::Persistent);
/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы.
/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними параметры таблицы.
std::stringstream metadata;
metadata << "metadata format version: 1" << std::endl;
metadata << "date column: " << data.date_column_name << std::endl;
@ -140,30 +149,23 @@ void StorageReplicatedMergeTree::createTable()
metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
metadata << "sign column: " << data.sign_column << std::endl;
metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
metadata << "columns:" << std::endl;
WriteBufferFromOStream buf(metadata);
for (auto & it : data.getColumnsList())
{
writeBackQuotedString(it.name, buf);
writeChar(' ', buf);
writeString(it.type->getName(), buf);
writeChar('\n', buf);
}
buf.next();
zookeeper->create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/columns", data.getColumnsList().toString(), zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
/// Создадим replicas в последнюю очередь, чтобы нельзя было добавить реплику, пока все остальные ноды не созданы.
zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
}
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
*/
void StorageReplicatedMergeTree::checkTableStructure()
void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks)
{
String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
ReadBufferFromString buf(metadata_str);
@ -179,20 +181,31 @@ void StorageReplicatedMergeTree::checkTableStructure()
assertString("\nsign column: ", buf);
assertString(data.sign_column, buf);
assertString("\nprimary key: ", buf);
/// NOTE: Можно сделать менее строгую проверку совпадения выражений, чтобы таблицы не ломались от небольших изменений
/// в коде formatAST.
assertString(formattedAST(data.primary_expr_ast), buf);
assertString("\ncolumns:\n", buf);
for (auto & it : data.getColumnsList())
{
String name;
readBackQuotedString(name, buf);
if (name != it.name)
throw Exception("Unexpected column name in ZooKeeper: expected " + it.name + ", found " + name,
ErrorCodes::UNKNOWN_IDENTIFIER);
assertString(" ", buf);
assertString(it.type->getName(), buf);
assertString("\n", buf);
}
assertString("\n", buf);
assertEOF(buf);
zkutil::Stat stat;
auto columns = NamesAndTypesList::parse(zookeeper->get(zookeeper_path + "/columns", &stat), context.getDataTypeFactory());
columns_version = stat.version;
if (columns != data.getColumnsList())
{
if (data.getColumnsList().sizeOfDifference(columns) <= 2 || skip_sanity_checks)
{
LOG_WARNING(log, "Table structure in ZooKeeper is a little different from local table structure. Assuming ALTER.");
/// Без всяких блокировок, потому что таблица еще не создана.
InterpreterAlterQuery::updateMetadata(database_name, table_name, columns, context);
data.setColumnsList(columns);
}
else
{
throw Exception("Table structure in ZooKeeper is very different from local table structure.",
ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
}
void StorageReplicatedMergeTree::createReplica()
@ -208,47 +221,23 @@ void StorageReplicatedMergeTree::createReplica()
/// Создадим пустую реплику.
zookeeper->create(replica_path, "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
/// Если таблица пуста, больше ничего делать не нужно.
/** Нужно изменить данные ноды /replicas на что угодно, чтобы поток, удаляющий старые записи в логе,
* споткнулся об это изменение и не удалил записи, которые мы еще не прочитали.
*/
zookeeper->set(zookeeper_path + "/replicas", "last added replica: " + replica_name);
if (replicas.empty())
{
LOG_DEBUG(log, "No other replicas");
return;
}
/// "Эталонная" реплика, у которой мы возьмем информацию о множестве кусков, очередь и указатели на логи.
String source_replica = replicas[0];
/** Дождемся, пока все активные реплики заметят появление этой реплики.
* Это не даст им удалять записи из своих логов, пока эта реплика их не скопирует.
*/
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to acknowledge me");
bool active = true;
while(true)
{
zkutil::EventPtr event = new Poco::Event;
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active", nullptr, event))
{
active = false;
break;
}
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, nullptr, event))
break;
event->wait();
}
/// Будем предпочитать активную реплику в качестве эталонной.
if (active)
source_replica = replica;
}
/// "Эталонная" реплика, у которой мы возьмем информацию о множестве кусков, очередь и указатель на лог.
String source_replica = replicas[rand() % replicas.size()];
LOG_INFO(log, "Will mimic " << source_replica);
@ -256,13 +245,8 @@ void StorageReplicatedMergeTree::createReplica()
/// Порядок следующих трех действий важен. Записи в логе могут продублироваться, но не могут потеряться.
/// Скопируем у эталонной реплики ссылки на все логи.
for (const String & replica : replicas)
{
String pointer;
if (zookeeper->tryGet(source_path + "/log_pointers/" + replica, pointer))
zookeeper->create(replica_path + "/log_pointers/" + replica, pointer, zkutil::CreateMode::Persistent);
}
/// Скопируем у эталонной реплики ссылку на лог.
zookeeper->set(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"));
/// Запомним очередь эталонной реплики.
Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
@ -339,7 +323,7 @@ void StorageReplicatedMergeTree::activateReplica()
replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", *zookeeper);
}
void StorageReplicatedMergeTree::checkParts()
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts");
@ -391,13 +375,6 @@ void StorageReplicatedMergeTree::checkParts()
for (const String & name : parts_to_fetch)
expected_parts.erase(name);
bool skip_sanity_check = false;
if (zookeeper->exists(replica_path + "/flags/force_restore_data"))
{
skip_sanity_check = true;
zookeeper->remove(replica_path + "/flags/force_restore_data");
}
String sanity_report =
"There are " + toString(unexpected_parts.size()) + " unexpected parts, "
+ toString(parts_to_add.size()) + " unexpectedly merged parts, "
@ -409,18 +386,18 @@ void StorageReplicatedMergeTree::checkParts()
expected_parts.size() > 20 ||
parts_to_fetch.size() > 2;
if (skip_sanity_check)
{
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts (flag "
<< replica_path << "/flags/force_restore_data). " << sanity_report);
}
else if (insane)
if (insane && !skip_sanity_checks)
{
throw Exception("The local set of parts of table " + getTableName() + " doesn't look like the set of parts in ZooKeeper. "
+ sanity_report,
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
}
if (insane)
{
LOG_WARNING(log, sanity_report);
}
/// Добавим в ZK информацию о кусках, покрывающих недостающие куски.
for (MergeTreeData::DataPartPtr part : parts_to_add)
{
@ -437,6 +414,7 @@ void StorageReplicatedMergeTree::checkParts()
LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
zookeeper->multi(ops);
@ -454,6 +432,7 @@ void StorageReplicatedMergeTree::checkParts()
/// Полагаемся, что это происходит до загрузки очереди (loadQueue).
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
ops.push_back(new zkutil::Op::Create(
@ -480,22 +459,54 @@ void StorageReplicatedMergeTree::initVirtualParts()
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops)
{
String another_replica = findReplicaHavingPart(part->name, false);
if (!another_replica.empty())
check(part->columns);
int expected_columns_version = columns_version;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
std::random_shuffle(replicas.begin(), replicas.end());
String expected_columns_str = part->columns.toString();
for (const String & replica : replicas)
{
String checksums_str;
if (zookeeper->tryGet(zookeeper_path + "/replicas/" + another_replica + "/parts/" + part->name + "/checksums", checksums_str))
zkutil::Stat stat_before, stat_after;
String columns_str;
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", columns_str, &stat_before))
continue;
if (columns_str != expected_columns_str)
{
auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
checksums.checkEqual(part->checksums, true);
LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica
<< " because columns are different");
continue;
}
String checksums_str;
/// Проверим, что версия ноды со столбцами не изменилась, пока мы читали checksums.
/// Это гарантирует, что столбцы и чексуммы относятся к одним и тем же данным.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/checksums", checksums_str) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", &stat_after) ||
stat_before.version != stat_after.version)
{
LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica
<< " because part changed while we were reading its checksums");
continue;
}
auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
checksums.checkEqual(part->checksums, true);
}
ops.push_back(new zkutil::Op::Check(
zookeeper_path + "/columns",
expected_columns_version));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name,
"",
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/columns",
part->columns.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/checksums",
part->checksums.toString(),
@ -510,6 +521,7 @@ void StorageReplicatedMergeTree::clearOldParts()
for (const String & name : parts)
{
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
int32_t code = zookeeper->tryMulti(ops);
@ -523,31 +535,50 @@ void StorageReplicatedMergeTree::clearOldParts()
void StorageReplicatedMergeTree::clearOldLogs()
{
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
zkutil::Stat stat;
if (!zookeeper->exists(zookeeper_path + "/log", &stat))
throw Exception(zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.numChildren;
/// Будем ждать, пока накопятся в 1.1 раза больше записей, чем нужно.
if (static_cast<double>(children_count) < data.settings.replicated_logs_to_keep * 1.1)
return;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
for (const String & replica : replicas)
{
String pointer;
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, pointer))
String pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
if (pointer.empty())
return;
min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
}
Strings entries = zookeeper->getChildren(replica_path + "/log");
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
std::sort(entries.begin(), entries.end());
/// Не будем трогать последние replicated_logs_to_keep записей.
entries.erase(entries.end() - std::min(entries.size(), data.settings.replicated_logs_to_keep), entries.end());
size_t removed = 0;
zkutil::Ops ops;
/// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик.
ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version));
for (const String & entry : entries)
{
UInt64 index = parse<UInt64>(entry.substr(strlen("log-")));
if (index >= min_pointer)
break;
zookeeper->remove(replica_path + "/log/" + entry);
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/log/" + entry, -1));
++removed;
}
if (removed > 0)
LOG_DEBUG(log, "Removed " << removed << " old log entries");
if (removed == 0)
return;
zookeeper->multi(ops);
LOG_DEBUG(log, "Removed " << removed << " old log entries");
}
void StorageReplicatedMergeTree::clearOldBlocks()
@ -563,7 +594,7 @@ void StorageReplicatedMergeTree::clearOldBlocks()
return;
LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - data.settings.replicated_deduplication_window
<< " old blocks from ZooKeeper");
<< " old blocks from ZooKeeper. This might take several minutes.");
Strings blocks = zookeeper->getChildren(zookeeper_path + "/blocks");
@ -581,6 +612,7 @@ void StorageReplicatedMergeTree::clearOldBlocks()
{
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
zookeeper->multi(ops);
@ -605,110 +637,73 @@ void StorageReplicatedMergeTree::loadQueue()
}
}
void StorageReplicatedMergeTree::pullLogsToQueue()
/// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper.
static String padIndex(UInt64 index)
{
String index_str = toString(index);
while (index_str.size() < 10)
index_str = '0' + index_str;
return index_str;
}
void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event)
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/// Сольем все логи в хронологическом порядке.
String index_str = zookeeper->get(replica_path + "/log_pointer");
UInt64 index;
struct LogIterator
if (index_str.empty())
{
String replica; /// Имя реплики.
UInt64 index; /// Номер записи в логе (суффикс имени ноды).
/// Если у нас еще нет указателя на лог, поставим указатель на первую запись в нем.
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
index = entries.empty() ? 0 : parse<UInt64>(std::min_element(entries.begin(), entries.end())->substr(strlen("log-")));
Int64 timestamp; /// Время (czxid) создания записи в логе.
String entry_str; /// Сама запись.
bool operator<(const LogIterator & rhs) const
{
/// Нужно доставать из очереди минимальный timestamp.
return timestamp > rhs.timestamp;
}
bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path)
{
String index_str = toString(index);
while (index_str.size() < 10)
index_str = '0' + index_str;
zkutil::Stat stat;
if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log/log-" + index_str, entry_str, &stat))
return false;
timestamp = stat.czxid;
return true;
}
};
typedef std::priority_queue<LogIterator> PriorityQueue;
PriorityQueue priority_queue;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
zookeeper->set(replica_path + "/log_pointer", toString(index), zkutil::CreateMode::Persistent);
}
else
{
String index_str;
UInt64 index;
if (zookeeper->tryGet(replica_path + "/log_pointers/" + replica, index_str))
{
index = parse<UInt64>(index_str);
}
else
{
/// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем.
Strings entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/log");
std::sort(entries.begin(), entries.end());
index = entries.empty() ? 0 : parse<UInt64>(entries[0].substr(strlen("log-")));
zookeeper->create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
}
LogIterator iterator;
iterator.replica = replica;
iterator.index = index;
if (iterator.readEntry(*zookeeper, zookeeper_path))
priority_queue.push(iterator);
index = parse<UInt64>(index_str);
}
if (priority_queue.empty())
return;
size_t count = 0;
while (!priority_queue.empty())
String entry_str;
while (zookeeper->tryGet(zookeeper_path + "/log/log-" + padIndex(index), entry_str))
{
LogIterator iterator = priority_queue.top();
priority_queue.pop();
++count;
++index;
LogEntry entry = LogEntry::parse(iterator.entry_str);
LogEntry entry = LogEntry::parse(entry_str);
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", iterator.entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
replica_path + "/queue/queue-", entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1));
replica_path + "/log_pointer", toString(index), -1));
auto results = zookeeper->multi(ops);
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
entry.addResultToVirtualParts(*this);
queue.push_back(entry);
}
++iterator.index;
if (iterator.readEntry(*zookeeper, zookeeper_path))
priority_queue.push(iterator);
if (next_update_event)
{
if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event))
next_update_event->set();
}
if (queue_task_handle)
queue_task_handle->wake();
LOG_DEBUG(log, "Pulled " << count << " entries to queue");
}
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{
if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART) &&future_parts.count(entry.new_part_name))
if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART) && future_parts.count(entry.new_part_name))
{
LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name <<
" because another log entry for the same part is being processed. This shouldn't happen often.");
@ -764,7 +759,7 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
else if (entry.type == LogEntry::MERGE_PARTS)
{
MergeTreeData::DataPartsVector parts;
bool have_all_parts = true;;
bool have_all_parts = true;
for (const String & name : entry.parts_to_merge)
{
MergeTreeData::DataPartPtr part = data.getContainingPart(name);
@ -802,6 +797,8 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
}
}
auto table_lock = lockStructure(true);
MergeTreeData::Transaction transaction;
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name, &transaction);
@ -898,24 +895,19 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
{
try
{
pullLogsToQueue();
pullLogsToQueue(queue_updating_event);
clearOldParts();
/// Каждую минуту выбрасываем ненужные записи из лога.
if (time(0) - clear_old_logs_time > 60)
{
clear_old_logs_time = time(0);
clearOldLogs();
}
queue_updating_event->wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
shutdown_event.tryWait(QUEUE_UPDATE_SLEEP_MS);
queue_updating_event->tryWait(ERROR_SLEEP_MS);
}
}
LOG_DEBUG(log, "queue updating thread finished");
}
bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & pool_context)
@ -1056,7 +1048,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
entry.parts_to_merge.push_back(part->name);
}
zookeeper->create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
/// (чтобы кусок добавился в virtual_parts).
@ -1092,23 +1084,33 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
if (!success)
merge_selecting_event.tryWait(MERGE_SELECTING_SLEEP_MS);
}
LOG_DEBUG(log, "merge selecting thread finished");
}
void StorageReplicatedMergeTree::clearOldBlocksThread()
void StorageReplicatedMergeTree::cleanupThread()
{
while (!shutdown_called && is_leader_node)
while (!shutdown_called)
{
try
{
clearOldBlocks();
clearOldParts();
if (is_leader_node)
{
clearOldLogs();
clearOldBlocks();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
shutdown_event.tryWait(60 * 1000);
shutdown_event.tryWait(CLEANUP_SLEEP_MS);
}
LOG_DEBUG(log, "cleanup thread finished");
}
bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
@ -1148,7 +1150,6 @@ void StorageReplicatedMergeTree::becomeLeader()
LOG_INFO(log, "Became leader");
is_leader_node = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
clear_old_blocks_thread = std::thread(&StorageReplicatedMergeTree::clearOldBlocksThread, this);
}
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
@ -1217,9 +1218,12 @@ void StorageReplicatedMergeTree::shutdown()
restarting_thread.join();
return;
}
permanent_shutdown_called = true;
permanent_shutdown_event.set();
restarting_thread.join();
endpoint_holder = nullptr;
}
void StorageReplicatedMergeTree::partialShutdown()
@ -1227,6 +1231,9 @@ void StorageReplicatedMergeTree::partialShutdown()
leader_election = nullptr;
shutdown_called = true;
shutdown_event.set();
merge_selecting_event.set();
queue_updating_event->set();
alter_thread_event->set();
replica_is_active_node = nullptr;
merger.cancelAll();
@ -1237,14 +1244,15 @@ void StorageReplicatedMergeTree::partialShutdown()
if (is_leader_node)
{
is_leader_node = false;
merge_selecting_event.set();
if (merge_selecting_thread.joinable())
merge_selecting_thread.join();
if (clear_old_blocks_thread.joinable())
clear_old_blocks_thread.join();
}
if (queue_updating_thread.joinable())
queue_updating_thread.join();
if (cleanup_thread.joinable())
cleanup_thread.join();
if (alter_thread.joinable())
alter_thread.join();
if (queue_task_handle)
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
@ -1258,31 +1266,8 @@ void StorageReplicatedMergeTree::goReadOnly()
is_read_only = true;
permanent_shutdown_called = true;
permanent_shutdown_event.set();
shutdown_called = true;
shutdown_event.set();
leader_election = nullptr;
replica_is_active_node = nullptr;
merger.cancelAll();
endpoint_holder = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
if (is_leader_node)
{
is_leader_node = false;
merge_selecting_event.set();
if (merge_selecting_thread.joinable())
merge_selecting_thread.join();
if (clear_old_blocks_thread.joinable())
clear_old_blocks_thread.join();
}
if (queue_updating_thread.joinable())
queue_updating_thread.join();
if (queue_task_handle)
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
LOG_TRACE(log, "Threads finished");
partialShutdown();
}
void StorageReplicatedMergeTree::startup()
@ -1300,6 +1285,7 @@ void StorageReplicatedMergeTree::startup()
std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
cleanup_thread = std::thread(&StorageReplicatedMergeTree::cleanupThread, this);
queue_task_handle = context.getBackgroundPool().addTask(
std::bind(&StorageReplicatedMergeTree::queueTask, this, std::placeholders::_1));
}
@ -1317,9 +1303,9 @@ void StorageReplicatedMergeTree::restartingThread()
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
/// Запретим писать в таблицу, пока подменяем zookeeper.
LOG_TRACE(log, "Locking all operations");
LOG_TRACE(log, "Locking INSERTs");
auto structure_lock = lockDataForAlter();
LOG_TRACE(log, "Locked all operations");
LOG_TRACE(log, "Locked INSERTs");
partialShutdown();
@ -1336,6 +1322,7 @@ void StorageReplicatedMergeTree::restartingThread()
tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
LOG_ERROR(log, "Exception in restartingThread. The storage will be read-only until server restart.");
goReadOnly();
LOG_DEBUG(log, "restarting thread finished");
return;
}
@ -1348,6 +1335,8 @@ void StorageReplicatedMergeTree::restartingThread()
{
tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
}
LOG_DEBUG(log, "restarting thread finished");
}
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
@ -1395,7 +1384,9 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
bool StorageReplicatedMergeTree::optimize()
{
/// Померджим какие-нибудь куски из директории unreplicated. TODO: Мерджить реплицируемые куски тоже.
/// Померджим какие-нибудь куски из директории unreplicated.
/// TODO: Мерджить реплицируемые куски тоже.
/// TODO: Не давать вызывать это из нескольких потоков сразу: один кусок может принять участие в нескольких несовместимых слияниях.
if (!unreplicated_data)
return false;