This commit is contained in:
Evgeniy Gatov 2014-07-04 20:43:57 +04:00
commit 2b9ac9b8ba
23 changed files with 521 additions and 421 deletions

View File

@ -49,6 +49,7 @@
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD 300 /// каждый период уменьшаем счетчик ошибок в 2 раза
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Максимальное время ожидания в очереди запросов.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 6
/// Используется в методе reserve, когда известно число строк, но неизвестны их размеры.
#define DBMS_APPROX_STRING_SIZE 64

View File

@ -17,6 +17,7 @@
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Interpreters/Users.h>
@ -95,6 +96,7 @@ struct ContextShared
ConfigurationPtr users_config; /// Конфиг с секциями users, profiles и quotas.
InterserverIOHandler interserver_io_handler; /// Обработчик для межсерверной передачи данных.
String default_replica_name; /// Имя реплики из конфига.
BackgroundProcessingPoolPtr background_pool; /// Пул потоков для фоновой работы, выполняемой таблицами.
/// Кластеры для distributed таблиц
/// Создаются при создании Distributed таблиц, так как нужно дождаться пока будут выставлены Settings
@ -317,6 +319,8 @@ public:
void setMarkCache(size_t cache_size_in_bytes);
MarkCachePtr getMarkCache() const;
BackgroundProcessingPool & getBackgroundPool();
/** Очистить кэши разжатых блоков и засечек.
* Обычно это делается при переименовании таблиц, изменении типа столбцов, удалении таблицы.
* - так как кэши привязаны к именам файлов, и становятся некорректными.

View File

@ -66,6 +66,9 @@ struct Settings
M(SettingBool, use_splitting_aggregator, false) \
/** Следует ли отменять выполняющийся запрос с таким же id, как новый. */ \
M(SettingBool, replace_running_query, false) \
/** Количество потоков, выполняющих фоновую работу для таблиц (например, слияние в merge tree). \
* TODO: Сейчас применяется только при запуске сервера. Можно сделать изменяемым динамически. */ \
M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE) \
\
M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM) \
\

View File

@ -3,11 +3,14 @@
#include <thread>
#include <set>
#include <map>
#include <list>
#include <Poco/Mutex.h>
#include <Poco/RWLock.h>
#include <Poco/Event.h>
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
@ -46,12 +49,40 @@ public:
/// Возвращает true, если что-то получилось сделать. В таком случае поток не будет спать перед следующим вызовом.
typedef std::function<bool (Context & context)> Task;
typedef std::shared_ptr<void> TaskHandle;
BackgroundProcessingPool() : size(1), sleep_seconds(1), shutdown(false) {}
class TaskInfo
{
public:
/// Переставить таск в начало очереди и разбудить какой-нибудь поток.
void wake()
{
Poco::ScopedLock<Poco::FastMutex> lock(pool.mutex);
pool.tasks.splice(pool.tasks.begin(), pool.tasks, iterator);
pool.wake_event.set();
}
private:
friend class BackgroundProcessingPool;
BackgroundProcessingPool & pool;
Task function;
Poco::RWLock lock;
volatile bool removed;
std::list<std::shared_ptr<TaskInfo>>::iterator iterator;
TaskInfo(BackgroundProcessingPool & pool_, const Task & function_) : pool(pool_), function(function_), removed(false) {}
};
typedef std::shared_ptr<TaskInfo> TaskHandle;
BackgroundProcessingPool(int size_) : size(size_), sleep_seconds(10), shutdown(false) {}
void setNumberOfThreads(int size_)
{
if (size_ <= 0)
throw Exception("Invalid number of threads: " + toString(size_), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
Poco::ScopedLock<Poco::FastMutex> tlock(threads_mutex);
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
@ -89,11 +120,12 @@ public:
{
Poco::ScopedLock<Poco::FastMutex> lock(threads_mutex);
TaskInfoPtr res = std::make_shared<TaskInfo>(task);
TaskHandle res(new TaskInfo(*this, task));
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
tasks.push_back(res);
res->iterator = --tasks.end();
}
if (threads.empty())
@ -108,12 +140,10 @@ public:
return res;
}
void removeTask(const TaskHandle & handle)
void removeTask(const TaskHandle & task)
{
Poco::ScopedLock<Poco::FastMutex> tlock(threads_mutex);
TaskInfoPtr task = std::static_pointer_cast<TaskInfo>(handle);
/// Дождемся завершения всех выполнений этой задачи.
{
Poco::ScopedWriteRWLock wlock(task->lock);
@ -147,6 +177,7 @@ public:
{
LOG_ERROR(&Logger::get("~BackgroundProcessingPool"), "Destroying non-empty BackgroundProcessingPool");
shutdown = true;
wake_event.set(); /// NOTE: это разбудит только один поток. Лучше было бы разбудить все.
for (std::thread & thread : threads)
thread.join();
}
@ -158,33 +189,21 @@ public:
}
private:
struct TaskInfo
{
Task function;
Poco::RWLock lock;
volatile bool removed;
TaskInfo(const Task & function_) : function(function_), removed(false) {}
};
typedef std::shared_ptr<TaskInfo> TaskInfoPtr;
typedef std::vector<TaskInfoPtr> Tasks;
typedef std::list<TaskHandle> Tasks;
typedef std::vector<std::thread> Threads;
Poco::FastMutex threads_mutex;
Poco::FastMutex mutex;
int size;
Tasks tasks;
Tasks tasks; /// Таски в порядке, в котором мы планируем их выполнять.
Threads threads;
Poco::Event wake_event;
Counters counters;
double sleep_seconds;
bool shutdown;
void threadFunction()
{
/// Начнем со случайной задачи (качество rand() не имеет значения).
size_t i = static_cast<size_t>(rand());
while (!shutdown)
{
Counters counters_diff;
@ -193,18 +212,16 @@ private:
try
{
TaskInfoPtr task;
TaskHandle task;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
tasks_count = tasks.size();
if (!tasks.empty())
{
need_sleep = true;
i %= tasks_count;
task = tasks[i];
++i;
task = tasks.front();
tasks.splice(tasks.end(), tasks, tasks.begin());
}
}
@ -225,9 +242,15 @@ private:
if (task->function(context))
{
/// Если у таска получилось выполнить какую-то работу, запустим его же снова без паузы.
--i;
need_sleep = false;
/// Если у таска получилось выполнить какую-то работу, запустим его снова без паузы.
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
auto it = std::find(tasks.begin(), tasks.end(), task);
if (it != tasks.end())
{
need_sleep = false;
tasks.splice(tasks.begin(), tasks, it);
}
}
}
catch (...)
@ -251,10 +274,12 @@ private:
if (need_sleep)
{
std::this_thread::sleep_for(std::chrono::duration<double>(sleep_seconds / tasks_count));
wake_event.tryWait(sleep_seconds * 1000. / tasks_count);
}
}
}
};
typedef Poco::SharedPtr<BackgroundProcessingPool> BackgroundProcessingPoolPtr;
}

View File

@ -22,6 +22,7 @@ public:
UInt64 temp_index = storage.increment.get();
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, temp_index);
storage.data.renameTempPartAndAdd(part, &storage.increment);
storage.merge_task_handle->wake();
}
}

View File

@ -79,13 +79,8 @@ struct MergeTreeSettings
/// Во столько раз ночью увеличиваем коэффициент.
size_t merge_parts_at_night_inc = 10;
/// Сколько потоков использовать для объединения кусков (для MergeTree).
/// Пул потоков общий на весь сервер.
size_t merging_threads = 6;
/// Сколько потоков использовать для загрузки кусков с других реплик и объединения кусков (для ReplicatedMergeTree).
/// Пул потоков на каждую таблицу свой.
size_t replication_threads = 4;
/// Сколько заданий на слияние кусков разрешено одновременно иметь в очереди ReplicatedMergeTree.
size_t max_replicated_merges_in_queue = 6;
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
size_t min_rows_for_concurrent_read = 20 * 8192;
@ -353,6 +348,43 @@ public:
typedef std::vector<DataPartPtr> DataPartsVector;
/// Некоторые операции над множеством кусков могут возвращать такой объект.
/// Если не был вызван commit, деструктор откатывает операцию.
class Transaction : private boost::noncopyable
{
public:
Transaction() {}
void commit()
{
data = nullptr;
removed_parts.clear();
added_parts.clear();
}
~Transaction()
{
try
{
if (data && (!removed_parts.empty() || !added_parts.empty()))
{
LOG_DEBUG(data->log, "Undoing transaction");
data->replaceParts(removed_parts, added_parts);
}
}
catch(...)
{
tryLogCurrentException("~MergeTreeData::Transaction");
}
}
private:
friend class MergeTreeData;
MergeTreeData * data = nullptr;
DataPartsVector removed_parts;
DataPartsVector added_parts;
};
/// Режим работы. См. выше.
enum Mode
{
@ -422,13 +454,18 @@ public:
/** Переименовывает временный кусок в постоянный и добавляет его в рабочий набор.
* Если increment!=nullptr, индекс куска берется из инкремента. Иначе индекс куска не меняется.
* Предполагается, что кусок не пересекается с существующими.
* Если out_transaction не nullptr, присваивает туда объект, позволяющий откатить добавление куска (но не переименование).
*/
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment = nullptr);
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment = nullptr, Transaction * out_transaction = nullptr);
/** То же, что renameTempPartAndAdd, но кусок может покрывать существующие куски.
* Удаляет и возвращает все куски, покрытые добавляемым (в возрастающем порядке).
*/
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr);
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr, Transaction * out_transaction = nullptr);
/** Убирает из рабочего набора куски remove и добавляет куски add.
*/
void replaceParts(const DataPartsVector & remove, const DataPartsVector & add);
/** Переименовывает кусок в prefix_кусок и убирает его из рабочего набора.
* Лучше использовать только когда никто не может читать или писать этот кусок

View File

@ -35,7 +35,8 @@ public:
const AllowedMergingPredicate & can_merge);
/// Сливает куски.
MergeTreeData::DataPartPtr mergeParts(const MergeTreeData::DataPartsVector & parts, const String & merged_name);
MergeTreeData::DataPartPtr mergeParts(
const MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeTreeData::Transaction * out_transaction = nullptr);
/// Примерное количество места на диске, нужное для мерджа. С запасом.
size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts);

View File

@ -50,71 +50,69 @@ public:
* В блоке должно быть либо ни одного столбца из column_names, либо все, для которых есть файлы. */
void readRange(size_t from_mark, size_t to_mark, Block & res)
{
size_t max_rows_to_read = (to_mark - from_mark) * storage.index_granularity;
/** Для некоторых столбцов файлы с данными могут отсутствовать.
* Это бывает для старых кусков, после добавления новых столбцов в структуру таблицы.
*/
bool has_missing_columns = false;
/// Указатели на столбцы смещений, общие для столбцов из вложенных структур данных
/// Если append, все значения nullptr, и offset_columns используется только для проверки, что столбец смещений уже прочитан.
OffsetColumns offset_columns;
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
try
{
if (streams.end() == streams.find(*it))
size_t max_rows_to_read = (to_mark - from_mark) * storage.index_granularity;
/** Для некоторых столбцов файлы с данными могут отсутствовать.
* Это бывает для старых кусков, после добавления новых столбцов в структуру таблицы.
*/
bool has_missing_columns = false;
/// Указатели на столбцы смещений, общие для столбцов из вложенных структур данных
/// Если append, все значения nullptr, и offset_columns используется только для проверки, что столбец смещений уже прочитан.
OffsetColumns offset_columns;
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
{
has_missing_columns = true;
continue;
}
if (streams.end() == streams.find(*it))
{
has_missing_columns = true;
continue;
}
/// Все столбцы уже есть в блоке. Будем добавлять значения в конец.
bool append = res.has(*it);
/// Все столбцы уже есть в блоке. Будем добавлять значения в конец.
bool append = res.has(*it);
ColumnWithNameAndType column;
column.name = *it;
column.type = storage.getDataTypeByName(*it);
if (append)
column.column = res.getByName(column.name).column;
ColumnWithNameAndType column;
column.name = *it;
column.type = storage.getDataTypeByName(*it);
if (append)
column.column = res.getByName(column.name).column;
bool read_offsets = true;
bool read_offsets = true;
/// Для вложенных структур запоминаем указатели на столбцы со смещениями
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&*column.type))
{
String name = DataTypeNested::extractNestedTableName(column.name);
/// Для вложенных структур запоминаем указатели на столбцы со смещениями
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&*column.type))
{
String name = DataTypeNested::extractNestedTableName(column.name);
if (offset_columns.count(name) == 0)
offset_columns[name] = append ? NULL : new ColumnArray::ColumnOffsets_t;
else
read_offsets = false; /// на предыдущих итерациях смещения уже считали вызовом readData
if (offset_columns.count(name) == 0)
offset_columns[name] = append ? NULL : new ColumnArray::ColumnOffsets_t;
else
read_offsets = false; /// на предыдущих итерациях смещения уже считали вызовом readData
if (!append)
column.column = new ColumnArray(type_arr->getNestedType()->createColumn(), offset_columns[name]);
}
else if (!append)
column.column = column.type->createColumn();
if (!append)
column.column = new ColumnArray(type_arr->getNestedType()->createColumn(), offset_columns[name]);
}
else if (!append)
column.column = column.type->createColumn();
try
{
readData(column.name, *column.type, *column.column, from_mark, max_rows_to_read, 0, read_offsets);
}
catch (const Exception & e)
{
/// Более хорошая диагностика.
if (e.code() == ErrorCodes::CHECKSUM_DOESNT_MATCH || e.code() == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED)
throw Exception(e.message() + " (while reading column " + *it + " from part " + path + ")", e.code());
else
throw;
if (!append && column.column->size())
res.insert(column);
}
if (!append && column.column->size())
res.insert(column);
if (has_missing_columns && !res)
throw Exception("All requested columns are missing", ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING);
}
catch (const Exception & e)
{
/// Более хорошая диагностика.
throw Exception(e.message() + " (while reading from part " + path + " from mark " + toString(from_mark) + " to "
+ toString(to_mark) + ")", e.code());
}
if (has_missing_columns && !res)
throw Exception("All requested columns are missing", ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING);
}
/// Заполняет столбцы, которых нет в блоке, значениями по умолчанию.
@ -178,10 +176,7 @@ public:
catch (const Exception & e)
{
/// Более хорошая диагностика.
if (e.code() == ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH)
throw Exception(e.message() + " (while reading from part " + path + ")", e.code());
else
throw;
throw Exception(e.message() + " (while reading from part " + path + ")", e.code());
}
}

View File

@ -45,7 +45,8 @@ public:
LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id);
storage.data.renameTempPartAndAdd(part);
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
@ -81,39 +82,36 @@ public:
block_number_lock.getUnlockOps(ops);
auto code = storage.zookeeper->tryMulti(ops);
if (code != ZOK)
if (code == ZOK)
{
if (code == ZNODEEXISTS)
transaction.commit();
storage.merge_selecting_event.set();
}
else if (code == ZNODEEXISTS)
{
/// Если блок с таким ID уже есть в таблице, откатим его вставку.
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
/// Если блок с таким ID уже есть в таблице, не будем его вставлять, и удалим только что записанные данные.
/// NOTE: В короткое время между renameTempPartAndAdd и deletePart в таблице на этой реплике доступны
/// продублированные данные.
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
auto found_checksums = part->checksums;
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
storage.data.deletePart(part);
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
}
else
{
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
}
else
{
throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": "
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
else
{
throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
}

View File

@ -5,7 +5,6 @@
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
namespace DB
{
@ -27,7 +26,7 @@ public:
*/
static StoragePtr create(const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const Context & context_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
@ -84,6 +83,8 @@ private:
String full_path;
Increment increment;
BackgroundProcessingPool & background_pool;
MergeTreeData data;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
@ -96,7 +97,6 @@ private:
volatile bool shutdown_called;
static BackgroundProcessingPool merge_pool;
BackgroundProcessingPool::TaskHandle merge_task_handle;
/// Пока существует, помечает части как currently_merging и держит резерв места.
@ -110,7 +110,7 @@ private:
CurrentlyMergingPartsTagger(const MergeTreeData::DataPartsVector & parts_, size_t total_size, StorageMergeTree & storage_)
: parts(parts_), storage(storage_)
{
/// Здесь не лочится мьютекс, так как конструктор вызывается внутри mergeThread, где он уже залочен.
/// Здесь не лочится мьютекс, так как конструктор вызывается внутри mergeTask, где он уже залочен.
reserved_space = DiskSpaceMonitor::reserve(storage.full_path, total_size); /// Может бросить исключение.
for (const auto & part : parts)
{
@ -143,7 +143,7 @@ private:
StorageMergeTree(const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const Context & context_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.

View File

@ -156,7 +156,6 @@ private:
typedef std::list<LogEntry> LogEntries;
typedef std::set<String> StringSet;
typedef std::vector<std::thread> Threads;
Context & context;
zkutil::ZooKeeperPtr zookeeper;
@ -214,11 +213,13 @@ private:
/// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь.
std::thread queue_updating_thread;
/// Потоки, выполняющие действия из очереди.
Threads queue_threads;
/// Задание, выполняющее действия из очереди.
BackgroundProcessingPool::TaskHandle queue_task_handle;
/// Поток, выбирающий куски для слияния.
std::thread merge_selecting_thread;
Poco::Event merge_selecting_event;
/// Поток, удаляющий информацию о старых блоках из ZooKeeper.
std::thread clear_old_blocks_thread;
@ -232,8 +233,10 @@ private:
/// Нужно ли завершить фоновые потоки (кроме restarting_thread).
volatile bool shutdown_called = false;
Poco::Event shutdown_event;
/// Нужно ли завершить restarting_thread.
volatile bool permanent_shutdown_called = false;
Poco::Event permanent_shutdown_event;
StorageReplicatedMergeTree(
const String & zookeeper_path_,
@ -319,15 +322,15 @@ private:
/** Выполнить действие из очереди. Бросает исключение, если что-то не так.
*/
void executeLogEntry(const LogEntry & entry);
void executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context);
/** В бесконечном цикле обновляет очередь.
*/
void queueUpdatingThread();
/** В бесконечном цикле выполняет действия из очереди.
/** Выполняет действия из очереди.
*/
void queueThread();
bool queueTask(BackgroundProcessingPool::Context & context);
/// Выбор кусков для слияния.

View File

@ -544,6 +544,14 @@ MarkCachePtr Context::getMarkCache() const
return shared->mark_cache;
}
BackgroundProcessingPool & Context::getBackgroundPool()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->background_pool)
shared->background_pool = new BackgroundProcessingPool(settings.background_pool_size);
return *shared->background_pool;
}
void Context::resetCaches() const
{
/// Исходим из допущения, что функции setUncompressedCache, setMarkCache, если вызывались, то раньше (при старте сервера). Иначе поставьте mutex.

View File

@ -584,9 +584,9 @@ void MergeTreeData::commitAlterModify(const ASTAlterQuery::Parameters & params)
}
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment)
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment, Transaction * out_transaction)
{
auto removed = renameTempPartAndReplace(part, increment);
auto removed = renameTempPartAndReplace(part, increment, out_transaction);
if (!removed.empty())
{
LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
@ -594,8 +594,12 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * in
}
}
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment)
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr part, Increment * increment, Transaction * out_transaction)
{
if (out_transaction && out_transaction->data)
throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid");
LOG_TRACE(log, "Renaming " << part->name << ".");
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
@ -665,9 +669,33 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDa
all_data_parts.insert(part);
if (out_transaction)
{
out_transaction->data = this;
out_transaction->added_parts = res;
out_transaction->removed_parts = DataPartsVector(1, part);
}
return res;
}
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add)
{
LOG_TRACE(log, "Removing " << remove.size() << " parts and adding " << add.size() << " parts.");
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
for (const DataPartPtr & part : remove)
{
part->remove_time = time(0);
data_parts.erase(part);
}
for (const DataPartPtr & part : add)
{
data_parts.insert(part);
}
}
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

View File

@ -248,9 +248,10 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
/// parts должны быть отсортированы.
MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & parts, const String & merged_name)
MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
const MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeTreeData::Transaction * out_transaction)
{
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name);
Names all_column_names;
NamesAndTypesList columns_list = data.getColumnsList();
@ -329,7 +330,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData::
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part_tmp_path);
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part);
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
if (new_data_part->name != merged_name)
LOG_ERROR(log, "Unexpected part name: " << new_data_part->name << " instead of " << merged_name);

View File

@ -89,13 +89,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
Poco::File(part_tmp_path).createDirectories();
LOG_TRACE(log, "Calculating primary expression.");
/// Если для сортировки надо вычислить некоторые столбцы - делаем это.
data.getPrimaryExpression()->execute(block);
LOG_TRACE(log, "Sorting by primary key.");
SortDescription sort_descr = data.getSortDescription();
/// Сортируем.

View File

@ -6,12 +6,9 @@
namespace DB
{
BackgroundProcessingPool StorageMergeTree::merge_pool;
StorageMergeTree::StorageMergeTree(const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const Context & context_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
@ -20,6 +17,7 @@ StorageMergeTree::StorageMergeTree(const String & path_, const String & database
const String & sign_column_,
const MergeTreeSettings & settings_)
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), increment(full_path + "increment.txt"),
background_pool(context_.getBackgroundPool()),
data(full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_,mode_, sign_column_, settings_, database_name_ + "." + name),
reader(data), writer(data), merger(data),
@ -34,7 +32,7 @@ StorageMergeTree::StorageMergeTree(const String & path_, const String & database
StoragePtr StorageMergeTree::create(
const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const Context & context_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
@ -48,9 +46,7 @@ StoragePtr StorageMergeTree::create(
sampling_expression_, index_granularity_, mode_, sign_column_, settings_);
StoragePtr res_ptr = res->thisPtr();
merge_pool.setNumberOfThreads(res->data.settings.merging_threads);
merge_pool.setSleepTime(5);
res->merge_task_handle = merge_pool.addTask(std::bind(&StorageMergeTree::mergeTask, res, std::placeholders::_1));
res->merge_task_handle = res->background_pool.addTask(std::bind(&StorageMergeTree::mergeTask, res, std::placeholders::_1));
return res_ptr;
}
@ -61,7 +57,7 @@ void StorageMergeTree::shutdown()
return;
shutdown_called = true;
merger.cancelAll();
merge_pool.removeTask(merge_task_handle);
background_pool.removeTask(merge_task_handle);
}
@ -142,8 +138,8 @@ bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context
auto can_merge = std::bind(&StorageMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
/// Если слияние запущено из пула потоков, и хотя бы половина потоков сливает большие куски,
/// не будем сливать большие куски.
int big_merges = merge_pool.getCounter("big merges");
bool only_small = pool_context && big_merges * 2 >= merge_pool.getNumberOfThreads();
int big_merges = background_pool.getCounter("big merges");
bool only_small = pool_context && big_merges * 2 >= background_pool.getNumberOfThreads();
if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge))

View File

@ -10,11 +10,11 @@ namespace DB
{
const auto QUEUE_UPDATE_SLEEP = std::chrono::seconds(5);
const auto QUEUE_UPDATE_SLEEP_MS = 5 * 1000;
const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5);
const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1);
const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0);
const auto MERGE_SELECTING_SLEEP = std::chrono::seconds(5);
const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
@ -38,7 +38,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name),
reader(data), writer(data), merger(data), fetcher(data),
log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)"))
log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")),
shutdown_event(false), permanent_shutdown_event(false)
{
if (!zookeeper)
{
@ -232,14 +233,16 @@ void StorageReplicatedMergeTree::createReplica()
bool active = true;
while(true)
{
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
zkutil::EventPtr event = new Poco::Event;
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active", nullptr, event))
{
active = false;
break;
}
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name))
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, nullptr, event))
break;
std::this_thread::sleep_for(std::chrono::seconds(1));
event->wait();
}
/// Будем предпочитать активную реплику в качестве эталонной.
@ -256,8 +259,9 @@ void StorageReplicatedMergeTree::createReplica()
/// Скопируем у эталонной реплики ссылки на все логи.
for (const String & replica : replicas)
{
String pointer = zookeeper->get(source_path + "/log_pointers/" + replica);
zookeeper->create(replica_path + "/log_pointers/" + replica, pointer, zkutil::CreateMode::Persistent);
String pointer;
if (zookeeper->tryGet(source_path + "/log_pointers/" + replica, pointer))
zookeeper->create(replica_path + "/log_pointers/" + replica, pointer, zkutil::CreateMode::Persistent);
}
/// Запомним очередь эталонной реплики.
@ -510,7 +514,7 @@ void StorageReplicatedMergeTree::clearOldParts()
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
int32_t code = zookeeper->tryMulti(ops);
if (code != ZOK)
LOG_WARNING(log, "Couldn't remove part " << name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
LOG_DEBUG(log, "Couldn't remove part " << name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
}
if (!parts.empty())
@ -666,6 +670,9 @@ void StorageReplicatedMergeTree::pullLogsToQueue()
priority_queue.push(iterator);
}
if (priority_queue.empty())
return;
size_t count = 0;
while (!priority_queue.empty())
@ -694,12 +701,19 @@ void StorageReplicatedMergeTree::pullLogsToQueue()
priority_queue.push(iterator);
}
if (count > 0)
LOG_DEBUG(log, "Pulled " << count << " entries to queue");
queue_task_handle->wake();
LOG_DEBUG(log, "Pulled " << count << " entries to queue");
}
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{
if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART) &&future_parts.count(entry.new_part_name))
{
LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name <<
" because another log entry for the same part is being processed. This shouldn't happen often.");
return false;
}
if (entry.type == LogEntry::MERGE_PARTS)
{
/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
@ -720,7 +734,7 @@ bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
return true;
}
void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context)
{
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS)
@ -776,12 +790,26 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
else
{
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name);
/// Если собираемся сливать большие куски, увеличим счетчик потоков, сливающих большие куски.
for (const auto & part : parts)
{
if (part->size * data.index_granularity > 25 * 1024 * 1024)
{
pool_context.incrementCounter("big merges");
pool_context.incrementCounter("replicated big merges");
break;
}
}
MergeTreeData::Transaction transaction;
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name, &transaction);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
transaction.commit();
merge_selecting_event.set();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
}
@ -885,91 +913,77 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
std::this_thread::sleep_for(QUEUE_UPDATE_SLEEP);
shutdown_event.tryWait(QUEUE_UPDATE_SLEEP_MS);
}
}
void StorageReplicatedMergeTree::queueThread()
bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & pool_context)
{
while (!shutdown_called)
{
LogEntry entry;
bool have_work = false;
LogEntry entry;
bool have_work = false;
try
try
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
bool empty = queue.empty();
if (!empty)
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
bool empty = queue.empty();
if (!empty)
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
if (shouldExecuteLogEntry(*it))
{
if (shouldExecuteLogEntry(*it))
{
entry = *it;
entry.tagPartAsFuture(*this);
queue.erase(it);
have_work = true;
break;
}
entry = *it;
entry.tagPartAsFuture(*this);
queue.erase(it);
have_work = true;
break;
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!have_work)
{
std::this_thread::sleep_for(QUEUE_NO_WORK_SLEEP);
continue;
}
bool success = false;
try
{
executeLogEntry(entry);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry.znode_name);
if (code != ZOK)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) + ". There must be a bug somewhere. Ignoring it.");
success = true;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
/// Если ни у кого нет нужного куска, это нормальная ситуация; не будем писать в лог с уровнем Error.
LOG_INFO(log, e.displayText());
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (shutdown_called)
break;
if (success)
{
std::this_thread::sleep_for(QUEUE_AFTER_WORK_SLEEP);
}
else
{
{
/// Добавим действие, которое не получилось выполнить, в конец очереди.
entry.future_part_tagger = nullptr;
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
queue.push_back(entry);
}
std::this_thread::sleep_for(QUEUE_ERROR_SLEEP);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!have_work)
return false;
bool success = false;
try
{
executeLogEntry(entry, pool_context);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry.znode_name);
if (code != ZOK)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) + ". There must be a bug somewhere. Ignoring it.");
success = true;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
/// Если ни у кого нет нужного куска, это нормальная ситуация; не будем писать в лог с уровнем Error.
LOG_INFO(log, e.displayText());
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!success)
{
/// Добавим действие, которое не получилось выполнить, в конец очереди.
entry.future_part_tagger = nullptr;
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
queue.push_back(entry);
}
return success;
}
void StorageReplicatedMergeTree::mergeSelectingThread()
@ -983,10 +997,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
try
{
size_t merges_queued = 0;
/// Есть ли в очереди мердж крупных кусков.
/// TODO: Если мердж уже выполняется, его нет в очереди, но здесь нужно все равно как-то о нем узнать.
bool has_big_merge = false;
/// Есть ли в очереди или в фоновом потоке мердж крупных кусков.
bool has_big_merge = context.getBackgroundPool().getCounter("replicated big merges") > 0;
if (!has_big_merge)
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
@ -1014,42 +1028,35 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
}
}
if (merges_queued >= data.settings.merging_threads)
do
{
std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
continue;
}
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
break;
MergeTreeData::DataPartsVector parts;
MergeTreeData::DataPartsVector parts;
{
String merged_name;
auto can_merge = std::bind(
&StorageReplicatedMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
if (merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
false, false, has_big_merge, can_merge) ||
merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
if (!merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
false, false, has_big_merge, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
true, false, has_big_merge, can_merge))
break;
LogEntry entry;
entry.type = LogEntry::MERGE_PARTS;
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
for (const auto & part : parts)
{
LogEntry entry;
entry.type = LogEntry::MERGE_PARTS;
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
for (const auto & part : parts)
{
entry.parts_to_merge.push_back(part->name);
}
zookeeper->create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
success = true;
entry.parts_to_merge.push_back(part->name);
}
}
if (success)
{
zookeeper->create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
/// (чтобы кусок добавился в virtual_parts).
pullLogsToQueue();
@ -1068,7 +1075,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
zookeeper->tryRemove(path);
}
}
success = true;
}
while(false);
}
catch (...)
{
@ -1079,7 +1089,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
break;
if (!success)
std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
merge_selecting_event.tryWait(MERGE_SELECTING_SLEEP_MS);
}
}
@ -1096,14 +1106,7 @@ void StorageReplicatedMergeTree::clearOldBlocksThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// Спим минуту, но проверяем, нужно ли завершиться, каждую секунду.
/// TODO: Лучше во всех подобных местах использовать condition variable.
for (size_t i = 0; i < 60; ++i)
{
if (shutdown_called || !is_leader_node)
break;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
shutdown_event.tryWait(60 * 1000);
}
}
@ -1114,6 +1117,11 @@ bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr
virtual_parts.getContainingPart(right->name) != right->name)
return false;
/// Если о каком-то из кусков нет информации в ZK, не будем сливать.
if (!zookeeper->exists(replica_path + "/parts/" + left->name) ||
!zookeeper->exists(replica_path + "/parts/" + right->name))
return false;
String month_name = left->name.substr(0, 6);
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
@ -1178,12 +1186,16 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
assertEOF(buf);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
auto removed_parts = data.renameTempPartAndReplace(part);
MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
transaction.commit();
merge_selecting_event.set();
for (const auto & removed_part : removed_parts)
{
@ -1193,7 +1205,7 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part");
LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_name);
}
void StorageReplicatedMergeTree::shutdown()
@ -1205,6 +1217,7 @@ void StorageReplicatedMergeTree::shutdown()
return;
}
permanent_shutdown_called = true;
permanent_shutdown_event.set();
restarting_thread.join();
}
@ -1212,6 +1225,7 @@ void StorageReplicatedMergeTree::partialShutdown()
{
leader_election = nullptr;
shutdown_called = true;
shutdown_event.set();
replica_is_active_node = nullptr;
merger.cancelAll();
@ -1222,6 +1236,7 @@ void StorageReplicatedMergeTree::partialShutdown()
if (is_leader_node)
{
is_leader_node = false;
merge_selecting_event.set();
if (merge_selecting_thread.joinable())
merge_selecting_thread.join();
if (clear_old_blocks_thread.joinable())
@ -1229,9 +1244,8 @@ void StorageReplicatedMergeTree::partialShutdown()
}
if (queue_updating_thread.joinable())
queue_updating_thread.join();
for (auto & thread : queue_threads)
thread.join();
queue_threads.clear();
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
LOG_TRACE(log, "Threads finished");
}
@ -1240,8 +1254,10 @@ void StorageReplicatedMergeTree::goReadOnly()
LOG_INFO(log, "Going to read-only mode");
is_read_only = true;
shutdown_called = true;
permanent_shutdown_called = true;
permanent_shutdown_event.set();
shutdown_called = true;
shutdown_event.set();
leader_election = nullptr;
replica_is_active_node = nullptr;
@ -1253,6 +1269,7 @@ void StorageReplicatedMergeTree::goReadOnly()
if (is_leader_node)
{
is_leader_node = false;
merge_selecting_event.set();
if (merge_selecting_thread.joinable())
merge_selecting_thread.join();
if (clear_old_blocks_thread.joinable())
@ -1260,15 +1277,15 @@ void StorageReplicatedMergeTree::goReadOnly()
}
if (queue_updating_thread.joinable())
queue_updating_thread.join();
for (auto & thread : queue_threads)
thread.join();
queue_threads.clear();
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
LOG_TRACE(log, "Threads finished");
}
void StorageReplicatedMergeTree::startup()
{
shutdown_called = false;
shutdown_event.reset();
merger.uncancelAll();
if (unreplicated_merger)
@ -1280,8 +1297,7 @@ void StorageReplicatedMergeTree::startup()
std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
for (size_t i = 0; i < data.settings.replication_threads; ++i)
queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this));
queue_task_handle = context.getBackgroundPool().addTask(std::bind(&StorageReplicatedMergeTree::queueTask, this, std::placeholders::_1));
}
void StorageReplicatedMergeTree::restartingThread()
@ -1308,7 +1324,7 @@ void StorageReplicatedMergeTree::restartingThread()
startup();
}
std::this_thread::sleep_for(std::chrono::seconds(2));
permanent_shutdown_event.tryWait(60 * 1000);
}
}
catch (...)
@ -1404,6 +1420,8 @@ void StorageReplicatedMergeTree::drop()
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
zookeeper->removeRecursive(zookeeper_path);
}
data.dropAllData();
}
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const

View File

@ -64,6 +64,7 @@ public:
~LeaderElection()
{
shutdown = true;
event->set();
thread.join();
}
@ -78,6 +79,7 @@ private:
std::thread thread;
volatile bool shutdown;
zkutil::EventPtr event = new Poco::Event();
State state;
@ -85,9 +87,11 @@ private:
void threadFunction()
{
try
while (!shutdown)
{
while (!shutdown)
bool success = false;
try
{
Strings children = zookeeper.getChildren(path);
std::sort(children.begin(), children.end());
@ -102,33 +106,33 @@ private:
return;
}
WatchFuture future;
if (zookeeper.exists(path + "/" + *(it - 1), nullptr, &future))
{
while (!shutdown)
if (future.wait_for(std::chrono::seconds(2)) != std::future_status::timeout)
break;
}
if (zookeeper.exists(path + "/" + *(it - 1), nullptr, event))
event->tryWait(60 * 1000);
success = true;
}
}
catch (const DB::Exception & e)
{
LOG_ERROR(log, "Exception in LeaderElection: Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Poco::Exception in LeaderElection: " << e.code() << ". " << e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "std::exception in LeaderElection: " << e.what());
}
catch (...)
{
LOG_ERROR(log, "Unknown exception in LeaderElection");
catch (const DB::Exception & e)
{
LOG_ERROR(log, "Exception in LeaderElection: Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Poco::Exception in LeaderElection: " << e.code() << ". " << e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "std::exception in LeaderElection: " << e.what());
}
catch (...)
{
LOG_ERROR(log, "Unknown exception in LeaderElection");
}
if (!success)
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
};

View File

@ -5,6 +5,8 @@
#include <boost/noncopyable.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <zookeeper/zookeeper.h>
#include <Poco/SharedPtr.h>
#include <Poco/Event.h>
namespace zkutil
{
@ -95,16 +97,6 @@ namespace CreateMode
extern const int PersistentSequential;
}
struct WatchEventInfo
{
int32_t event;
int32_t state;
std::string path;
typedef Poco::SharedPtr<Poco::Event> EventPtr;
WatchEventInfo() {}
WatchEventInfo(int32_t event_, int32_t state_, const char * path_)
: event(event_), state(state_), path(path_) {}
};
typedef std::future<WatchEventInfo> WatchFuture;
}

View File

@ -12,21 +12,17 @@ namespace zkutil
const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
const UInt32 DEFAULT_RETRY_NUM = 3;
struct WatchWithPromise;
struct WatchWithEvent;
/// Из-за вызова С кода легче самому явно управлять памятью
typedef WatchWithPromise * WatchWithPromisePtr;
typedef WatchWithEvent * WatchWithEventPtr;
/** Сессия в ZooKeeper. Интерфейс существенно отличается от обычного API ZooKeeper.
* Вместо callback-ов для watch-ей используются std::future.
*
* Вместо callback-ов для watch-ей используются Poco::Event. Для указанного события вызывается set() только при первом вызове watch.
* Методы на чтение при восстанавливаемых ошибках OperationTimeout, ConnectionLoss пытаются еще retry_num раз.
* Методы на запись не пытаются повторить при восстанавливаемых ошибках, т.к. это приводит к проблеммам типа удаления дважды одного и того же.
*
* Методы с названиями, не начинающимися с try, бросают исключение при любой ошибке.
*
* Методы с названиями, начинающимися с try, не бросают исключение только при перечисленных видах ошибок.
* Например, исключение бросается в любом случае, если сессия разорвалась или если не хватает прав или ресурсов.
*/
class ZooKeeper
{
@ -109,14 +105,14 @@ public:
/// Если есть проблемы с сетью может сам удалить ноду и вернуть ZNONODE
int32_t tryRemoveWithRetries(const std::string & path, int32_t version = -1);
bool exists(const std::string & path, Stat * stat = nullptr, WatchFuture * watch = nullptr);
bool exists(const std::string & path, Stat * stat = nullptr, EventPtr watch = nullptr);
std::string get(const std::string & path, Stat * stat = nullptr, WatchFuture * watch = nullptr);
std::string get(const std::string & path, Stat * stat = nullptr, EventPtr watch = nullptr);
/** Не бросает исключение при следующих ошибках:
* - Такой ноды нет. В таком случае возвращает false.
*/
bool tryGet(const std::string & path, std::string & res, Stat * stat = nullptr, WatchFuture * watch = nullptr);
bool tryGet(const std::string & path, std::string & res, Stat * stat = nullptr, EventPtr watch = nullptr);
void set(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);
@ -132,14 +128,14 @@ public:
Strings getChildren(const std::string & path,
Stat * stat = nullptr,
WatchFuture * watch = nullptr);
EventPtr watch = nullptr);
/** Не бросает исключение при следующих ошибках:
* - Такой ноды нет.
*/
int32_t tryGetChildren(const std::string & path, Strings & res,
Stat * stat = nullptr,
WatchFuture * watch = nullptr);
EventPtr watch = nullptr);
/** Транзакционно выполняет несколько операций. При любой ошибке бросает исключение.
*/
@ -166,10 +162,13 @@ public:
/// На самом деле размер меньше, но для удобства округлим в верхнюю сторону
static const size_t SEQUENTIAL_SUFFIX_SIZE = 64;
private:
friend struct WatchWithEvent;
void init(const std::string & hosts, int32_t sessionTimeoutMs, WatchFunction * watch_);
void removeChildrenRecursive(const std::string & path);
WatchWithPromisePtr watchForFuture(WatchFuture * future);
static void processPromise(zhandle_t * zh, int type, int state, const char * path, void *watcherCtx);
void * watchForEvent(EventPtr event);
watcher_fn callbackForEvent(EventPtr event);
static void processEvent(zhandle_t * zh, int type, int state, const char * path, void *watcherCtx);
template <class T>
int32_t retry(const T & operation)
@ -190,14 +189,14 @@ private:
/// методы не бросают исключений, а возвращают коды ошибок
int32_t createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated);
int32_t removeImpl(const std::string & path, int32_t version = -1);
int32_t getImpl(const std::string & path, std::string & res, Stat * stat = nullptr, WatchFuture * watch = nullptr);
int32_t getImpl(const std::string & path, std::string & res, Stat * stat = nullptr, EventPtr watch = nullptr);
int32_t setImpl(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);
int32_t getChildrenImpl(const std::string & path, Strings & res,
Stat * stat = nullptr,
WatchFuture * watch = nullptr);
EventPtr watch = nullptr);
int32_t multiImpl(const Ops & ops, OpResultsPtr * out_results = nullptr);
int32_t existsImpl(const std::string & path, Stat * stat_, WatchFuture * watch = nullptr);
int32_t existsImpl(const std::string & path, Stat * stat_, EventPtr watch = nullptr);
std::string hosts;
int32_t sessionTimeoutMs;
@ -207,7 +206,7 @@ private:
zhandle_t * impl;
WatchFunction * state_watch;
std::unordered_set<WatchWithPromise *> watch_store;
std::unordered_set<WatchWithEvent *> watch_store;
/// Количество попыток повторить операцию чтения при OperationTimeout, ConnectionLoss
size_t retry_num = 3;

View File

@ -22,49 +22,38 @@ void check(int32_t code, const std::string path = "")
}
}
typedef std::promise<WatchEventInfo> WatchPromise;
struct WatchWithPromise
struct WatchWithEvent
{
WatchPromise promise;
bool notified;
/// существует все время существования WatchWithPromise
/// существует все время существования WatchWithEvent
ZooKeeper & zk;
EventPtr event;
bool notified = false;
WatchWithPromise(ZooKeeper & zk_) : notified(false), zk(zk_) {}
WatchWithEvent(ZooKeeper & zk_, EventPtr event_) : zk(zk_), event(event_) {}
void process(zhandle_t * zh, int32_t event, int32_t state, const char * path)
void process(zhandle_t * zh, int32_t event_type, int32_t state, const char * path)
{
if (notified)
if (!notified)
{
LOG_WARNING(&Logger::get("WatchWithPromise"), "Ignoring event " << event << " with state "
<< state << (path ? std::string(" for path ") + path : ""));
return;
notified = true;
event->set();
}
promise.set_value(WatchEventInfo(event, state, path));
notified = true;
}
};
WatchWithPromisePtr ZooKeeper::watchForFuture(WatchFuture * future)
{
if (!future)
return nullptr;
WatchWithPromisePtr res = new WatchWithPromise(*this);
watch_store.insert(res);
*future = res->promise.get_future();
return res;
}
void ZooKeeper::processPromise(zhandle_t * zh, int type, int state, const char * path, void *watcherCtx)
void ZooKeeper::processEvent(zhandle_t * zh, int type, int state, const char * path, void *watcherCtx)
{
if (watcherCtx)
{
WatchWithPromise * watch = reinterpret_cast<WatchWithPromise *>(watcherCtx);
WatchWithEvent * watch = reinterpret_cast<WatchWithEvent *>(watcherCtx);
watch->process(zh, type, state, path);
delete watch;
watch->zk.watch_store.erase(watch);
/// Гарантируется, что не-ZOO_SESSION_EVENT событие придет ровно один раз (https://issues.apache.org/jira/browse/ZOOKEEPER-890).
if (type != ZOO_SESSION_EVENT)
{
watch->zk.watch_store.erase(watch);
delete watch;
}
}
}
@ -128,21 +117,33 @@ ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std
init(args.hosts, args.session_timeout_ms, watch);
}
void * ZooKeeper::watchForEvent(EventPtr event)
{
if (event)
{
WatchWithEvent * res = new WatchWithEvent(*this, event);
watch_store.insert(res);
return reinterpret_cast<void *>(res);
}
else
{
return nullptr;
}
}
watcher_fn ZooKeeper::callbackForEvent(EventPtr event)
{
return event ? processEvent : nullptr;
}
int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
Stat * stat_,
WatchFuture * watch)
EventPtr watch)
{
String_vector strings;
int code;
Stat stat;
if (watch)
{
code = zoo_wget_children2(impl, path.c_str(), processPromise, reinterpret_cast<void *>(watchForFuture(watch)), &strings, &stat);
}
else
{
code = zoo_wget_children2(impl, path.c_str(), nullptr, nullptr, &strings, &stat);
}
code = zoo_wget_children2(impl, path.c_str(), callbackForEvent(watch), watchForEvent(watch), &strings, &stat);
if (code == ZOK)
{
@ -157,7 +158,7 @@ int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
return code;
}
Strings ZooKeeper::getChildren(
const std::string & path, Stat * stat, WatchFuture * watch)
const std::string & path, Stat * stat, EventPtr watch)
{
Strings res;
check(tryGetChildren(path, res, stat, watch), path);
@ -165,7 +166,7 @@ Strings ZooKeeper::getChildren(
}
int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Stat * stat_, WatchFuture * watch)
Stat * stat_, EventPtr watch)
{
int32_t code = retry(boost::bind(&ZooKeeper::getChildrenImpl, this, boost::ref(path), boost::ref(res), stat_, watch));
@ -262,14 +263,11 @@ int32_t ZooKeeper::tryRemoveWithRetries(const std::string & path, int32_t versio
return retry(boost::bind(&ZooKeeper::tryRemove, this, boost::ref(path), version));
}
int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat_, WatchFuture * watch)
int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat_, EventPtr watch)
{
int32_t code;
Stat stat;
if (watch)
code = zoo_wexists(impl, path.c_str(), processPromise, reinterpret_cast<void *>(watchForFuture(watch)), &stat);
else
code = zoo_wexists(impl, path.c_str(), nullptr, nullptr, &stat);
code = zoo_wexists(impl, path.c_str(), callbackForEvent(watch), watchForEvent(watch), &stat);
if (code == ZOK)
{
@ -280,7 +278,7 @@ int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat_, WatchFutur
return code;
}
bool ZooKeeper::exists(const std::string & path, Stat * stat_, WatchFuture * watch)
bool ZooKeeper::exists(const std::string & path, Stat * stat_, EventPtr watch)
{
int32_t code = retry(boost::bind(&ZooKeeper::existsImpl, this, path, stat_, watch));
@ -292,16 +290,13 @@ bool ZooKeeper::exists(const std::string & path, Stat * stat_, WatchFuture * wat
return true;
}
int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Stat * stat_, WatchFuture * watch)
int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Stat * stat_, EventPtr watch)
{
char buffer[MAX_NODE_SIZE];
int buffer_len = MAX_NODE_SIZE;
int32_t code;
Stat stat;
if (watch)
code = zoo_wget(impl, path.c_str(), processPromise, reinterpret_cast<void *>(watchForFuture(watch)), buffer, &buffer_len, &stat);
else
code = zoo_wget(impl, path.c_str(), nullptr, nullptr, buffer, &buffer_len, &stat);
code = zoo_wget(impl, path.c_str(), callbackForEvent(watch), watchForEvent(watch), buffer, &buffer_len, &stat);
if (code == ZOK)
{
@ -313,16 +308,16 @@ int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Stat * s
return code;
}
std::string ZooKeeper::get(const std::string & path, Stat * stat, WatchFuture * watch)
std::string ZooKeeper::get(const std::string & path, Stat * stat, EventPtr watch)
{
std::string res;
if (tryGet(path, res, stat, watch))
return res;
else
throw KeeperException("Fail to get data for node " + path);
throw KeeperException("Can't get data for node " + path + ": node doesn't exist");
}
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, WatchFuture * watch)
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, EventPtr watch)
{
int32_t code = retry(boost::bind(&ZooKeeper::getImpl, this, boost::ref(path), boost::ref(res), stat_, watch));
@ -443,10 +438,9 @@ ZooKeeper::~ZooKeeper()
LOG_ERROR(&Logger::get("~ZooKeeper"), "Failed to close ZooKeeper session: " << zerror(code));
}
/// удаляем WatchWithPromise которые уже никогда не будут обработаны
for (WatchWithPromise * watch : watch_store)
/// удаляем WatchWithEvent которые уже никогда не будут обработаны
for (WatchWithEvent * watch : watch_store)
delete watch;
watch_store.clear();
}
ZooKeeperPtr ZooKeeper::startNewSession() const

View File

@ -26,11 +26,11 @@ void printStat(const zkutil::Stat & s)
std::cout << " pzxid: " << s.pzxid << std::endl;
}
void waitForWatch(zkutil::WatchFuture & future)
void waitForWatch(zkutil::EventPtr event)
{
std::cout << "waiting for watch" << std::endl;
zkutil::WatchEventInfo res = future.get();
std::cout << "event: " << res.event << std::endl;
event->wait();
std::cout << "watch event was signalled" << std::endl;
}
@ -93,14 +93,14 @@ int main(int argc, char ** argv)
std::string w;
ss >> w;
bool watch = w == "w";
zkutil::WatchFuture future;
std::vector<std::string> v = zk.getChildren(path, nullptr, watch ? &future : nullptr);
zkutil::EventPtr event = watch ? new Poco::Event() : nullptr;
std::vector<std::string> v = zk.getChildren(path, nullptr, event);
for (size_t i = 0; i < v.size(); ++i)
{
std::cout << v[i] << std::endl;
}
if (watch)
waitForWatch(future);
waitForWatch(event);
}
else if (cmd == "create")
{
@ -147,28 +147,28 @@ int main(int argc, char ** argv)
std::string w;
ss >> w;
bool watch = w == "w";
zkutil::WatchFuture future;
zkutil::EventPtr event = watch ? new Poco::Event() : nullptr;
zkutil::Stat stat;
bool e = zk.exists(path, &stat, watch ? &future : nullptr);
bool e = zk.exists(path, &stat, event);
if (e)
printStat(stat);
else
std::cout << path << " does not exist" << std::endl;
if (watch)
waitForWatch(future);
waitForWatch(event);
}
else if (cmd == "get")
{
std::string w;
ss >> w;
bool watch = w == "w";
zkutil::WatchFuture future;
zkutil::EventPtr event = watch ? new Poco::Event() : nullptr;
zkutil::Stat stat;
std::string data = zk.get(path, &stat, watch ? &future : nullptr);
std::string data = zk.get(path, &stat, event);
std::cout << "Data: " << data << std::endl;
printStat(stat);
if (watch)
waitForWatch(future);
waitForWatch(event);
}
else if (cmd == "set")
{

View File

@ -3,9 +3,6 @@
#include <unistd.h>
using namespace zkutil;
/** Проверяет, правда ли, что вызовы при просроченной сессии блокируются навсегда.
* Разорвать сессию можно, например, так: `./nozk.sh && sleep 6s && ./yeszk.sh`
*/
void watcher(zhandle_t *zh, int type, int state, const char *path,void *watcherCtx)
{
@ -20,15 +17,14 @@ int main()
std::cout << "create path" << std::endl;
zk.create("/test", "old", zkutil::CreateMode::Persistent);
zkutil::Stat stat;
zkutil::WatchFuture watch;
zkutil::EventPtr watch = new Poco::Event;
std::cout << "get path" << std::endl;
zk.get("/test", &stat, &watch);
zk.get("/test", &stat, watch);
std::cout << "set path" << std::endl;
zk.set("/test", "new");
watch.wait();
WatchEventInfo event_info = watch.get();
std::cout << "watch happened for path: " << event_info.path << " " << event_info.event << std::endl;
watch->wait();
std::cout << "watch happened" << std::endl;
std::cout << "remove path" << std::endl;
zk.remove("/test");