diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h index b8081b77795..89bba7c1f8c 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h @@ -81,10 +81,7 @@ public: void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const { - Field field; - columns[0]->get(row_num, field); - ReadBufferFromString read_buffer(field.safeGet()); - nested_func->deserializeMerge(place, read_buffer); + merge(place, columns[0]->getDataAt(row_num).data); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const diff --git a/dbms/include/DB/Columns/ColumnAggregateFunction.h b/dbms/include/DB/Columns/ColumnAggregateFunction.h index 7aa60c49226..2433ae1a610 100644 --- a/dbms/include/DB/Columns/ColumnAggregateFunction.h +++ b/dbms/include/DB/Columns/ColumnAggregateFunction.h @@ -103,18 +103,26 @@ public: return StringRef(reinterpret_cast(&data[n]), sizeof(data[n])); } + /// Объединить состояние в последней строке с заданным + void insertMerge(const Field & x) + { + ReadBufferFromString read_buffer(x.safeGet()); + func->deserializeMerge(data.back(), read_buffer); + } + void insert(const Field & x) { - insertDefault(); - ReadBufferFromString read_buffer(x.safeGet()); - func->deserializeMerge(data[data.size()-1], read_buffer); + data.push_back(AggregateDataPtr()); + func->create(data.back()); + insertMerge(x); } void insertData(const char * pos, size_t length) { - insertDefault(); + data.push_back(AggregateDataPtr()); + func->create(data.back()); ReadBuffer read_buffer(const_cast(pos), length); - func->deserializeMerge(data[data.size()-1], read_buffer); + func->deserializeMerge(data.back(), read_buffer); } ColumnPtr cut(size_t start, size_t length) const diff --git a/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h new file mode 100644 index 00000000000..5c3feb22c11 --- /dev/null +++ b/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h @@ -0,0 +1,87 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** Соединяет несколько сортированных потоков в один. + * При этом, для каждой группы идущих подряд одинаковых значений первичного ключа (столбцов, по которым сортируются данные), + * сливает их в одну строку. При слиянии, производится доагрегация данных - слияние состояний агрегатных функций, + * соответствующих одному значению первичного ключа. Для столбцов, не входящих в первичный ключ, и не имеющих тип AggregateFunction, + * при слиянии, выбирается первое попавшееся значение. + */ +class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream +{ +public: + AggregatingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), + log(&Logger::get("SummingSortedBlockInputStream")) + { + } + + String getName() const { return "AggregatingSortedBlockInputStream"; } + + String getID() const + { + std::stringstream res; + res << "AggregatingSorted(inputs"; + + for (size_t i = 0; i < children.size(); ++i) + res << ", " << children[i]->getID(); + + res << ", description"; + + for (size_t i = 0; i < description.size(); ++i) + res << ", " << description[i].getID(); + + res << ")"; + return res.str(); + } + +protected: + /// Может возвращаться на 1 больше записей, чем max_block_size. + Block readImpl(); + +private: + Logger * log; + + /// Столбцы с какими номерами надо аггрегировать. + ColumnNumbers column_numbers_to_aggregate; + std::vector column_to_aggregate; + + Row current_key; /// Текущий первичный ключ. + Row next_key; /// Первичный ключ следующей строки. + + Row current_row; + + /** Делаем поддержку двух разных курсоров - с Collation и без. + * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. + */ + template + void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue); + + /// Вставить в результат первую строку для текущей группы. + void insertCurrentRow(ColumnPlainPtrs & merged_columns); + + /** Извлечь все состояния аггрегатных функции и объединить с текущей группой. + */ + template + void addRow(TSortCursor & cursor) + { + for (size_t i = 0, size = column_numbers_to_aggregate.size(); i < size; ++i) + { + size_t j = column_numbers_to_aggregate[i]; + column_to_aggregate[i]->insertMerge((*cursor->all_columns[j])[cursor->pos]); + } + } +}; + +} diff --git a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h index adeef3c2a64..c40f17c4b51 100644 --- a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h @@ -85,7 +85,7 @@ protected: QueueWithCollation queue_with_collation; - /// Эти методы используются в Collapsing/Summing SortedBlockInputStream-ах. + /// Эти методы используются в Collapsing/Summing/Aggregating SortedBlockInputStream-ах. /// Сохранить строчку, на которую указывает cursor, в row. template diff --git a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h index 3c532e6cbf2..96630fbcdce 100644 --- a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h @@ -61,7 +61,9 @@ public: ActiveDataPartSet(); void add(const String & name); - String getContainingPart(const String & name); + String getContainingPart(const String & name) const; + + Strings getParts() const; static String getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level); @@ -74,7 +76,7 @@ public: private: typedef std::set Parts; - Poco::Mutex mutex; + mutable Poco::Mutex mutex; Parts parts; }; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h index 914792d33a3..72e2cf6d21b 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -17,16 +17,7 @@ public: auto part_blocks = storage.writer.splitBlockIntoParts(block); for (auto & current_block : part_blocks) { - size_t parts_count = storage.data.getMaxPartsCountForMonth(); - if (parts_count > storage.data.settings.parts_to_delay_insert) - { - double delay = std::pow(storage.data.settings.insert_delay_step, parts_count - storage.data.settings.parts_to_delay_insert); - delay /= 1000; - delay = std::min(delay, 5 * 60.); /// Ограничим задержку 5 минутами. - LOG_INFO(storage.log, "Delaying inserting block by " - << std::fixed << std::setprecision(4) << delay << "s because there are " << parts_count << " parts"); - std::this_thread::sleep_for(std::chrono::duration(delay)); - } + storage.data.delayInsertIfNeeded(); UInt64 temp_index = storage.increment.get(); MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, temp_index); diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 6051254d7c4..cf8b5806151 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -350,6 +350,7 @@ public: Ordinary, Collapsing, Summing, + Aggregating, }; /** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце), @@ -398,6 +399,10 @@ public: */ size_t getMaxPartsCountForMonth(); + /** Если в таблице слишком много активных кусков, спит некоторое время, чтобы дать им возможность смерджиться. + */ + void delayInsertIfNeeded(); + /** Возвращает кусок с указанным именем или кусок, покрывающий его. Если такого нет, возвращает nullptr. * Если including_inactive, просматриваются также неактивные куски (all_data_parts). * При including_inactive, нахождение куска гарантируется только если есть кусок, совпадающий с part_name; diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 5044c1a4efc..b501cd4e7a2 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -20,6 +20,8 @@ public: auto part_blocks = storage.writer.splitBlockIntoParts(block); for (auto & current_block : part_blocks) { + storage.data.delayInsertIfNeeded(); + ++block_index; String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index); time_t min_date_time = DateLUTSingleton::instance().fromDayNum(DayNum_t(current_block.min_date)); diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 9e1fa62b5d2..d8a384f6812 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -72,45 +72,6 @@ public: private: friend class ReplicatedMergeTreeBlockOutputStream; - /// Добавляет куски в множество currently_merging. - struct CurrentlyMergingPartsTagger - { - Strings parts; - StorageReplicatedMergeTree & storage; - - CurrentlyMergingPartsTagger(const Strings & parts_, StorageReplicatedMergeTree & storage_) - : parts(parts_), storage(storage_) - { - Poco::ScopedLock lock(storage.currently_merging_mutex); - for (const auto & name : parts) - { - if (storage.currently_merging.count(name)) - throw Exception("Tagging alreagy tagged part " + name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); - } - storage.currently_merging.insert(parts.begin(), parts.end()); - } - - ~CurrentlyMergingPartsTagger() - { - try - { - Poco::ScopedLock lock(storage.currently_merging_mutex); - for (const auto & name : parts) - { - if (!storage.currently_merging.count(name)) - throw Exception("Untagging already untagged part " + name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); - storage.currently_merging.erase(name); - } - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - }; - - typedef Poco::SharedPtr CurrentlyMergingPartsTaggerPtr; - /// Добавляет кусок в множество future_parts. struct FuturePartTagger { @@ -152,17 +113,16 @@ private: String znode_name; Type type; - String source_replica; + String source_replica; /// Пустая строка значит, что эта запись была добавлена сразу в очередь, а не скопирована из лога. String new_part_name; Strings parts_to_merge; - CurrentlyMergingPartsTaggerPtr currently_merging_tagger; FuturePartTaggerPtr future_part_tagger; - void tagPartsAsCurrentlyMerging(StorageReplicatedMergeTree & storage) + void addResultToVirtualParts(StorageReplicatedMergeTree & storage) { - if (type == MERGE_PARTS) - currently_merging_tagger = new CurrentlyMergingPartsTagger(parts_to_merge, storage); + if (type == MERGE_PARTS || type == GET_PART) + storage.virtual_parts.add(new_part_name); } void tagPartAsFuture(StorageReplicatedMergeTree & storage) @@ -205,9 +165,8 @@ private: /// Если true, таблица в офлайновом режиме, и в нее нельзя писать. bool is_read_only = false; - /// Куски, для которых в очереди есть задание на слияние. - StringSet currently_merging; - Poco::FastMutex currently_merging_mutex; + /// Каким будет множество активных кусков после выполнения всей текущей очереди. + ActiveDataPartSet virtual_parts; /** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/). * В ZK записи в хронологическом порядке. Здесь - не обязательно. @@ -294,13 +253,12 @@ private: /// Инициализация. - /** Проверяет, что в ZooKeeper в таблице нет данных. - */ - bool isTableEmpty(); - /** Создает минимальный набор нод в ZooKeeper. */ void createTable(); + + /** Создает реплику в ZooKeeper и добавляет в очередь все, что нужно, чтобы догнать остальные реплики. + */ void createReplica(); /** Отметить в ZooKeeper, что эта реплика сейчас активна. @@ -319,6 +277,9 @@ private: */ void checkParts(); + /// Положить все куски из data в virtual_parts. + void initVirtualParts(); + /// Запустить или остановить фоновые потоки. Используется для частичной переинициализации при пересоздании сессии в ZooKeeper. void startup(); void partialShutdown(); diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp new file mode 100644 index 00000000000..0694e0ee8c9 --- /dev/null +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp @@ -0,0 +1,117 @@ +#include + + +namespace DB +{ + + +void AggregatingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_columns) +{ + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insert(current_row[i]); +} + + +Block AggregatingSortedBlockInputStream::readImpl() +{ + if (!children.size()) + return Block(); + + if (children.size() == 1) + return children[0]->read(); + + Block merged_block; + ColumnPlainPtrs merged_columns; + + init(merged_block, merged_columns); + if (merged_columns.empty()) + return Block(); + + /// Дополнительная инициализация. + if (current_row.empty()) + { + current_row.resize(num_columns); + current_key.resize(description.size()); + next_key.resize(description.size()); + + /// Заполним номера столбцов, которые нужно доагрегировать. + for (size_t i = 0; i < num_columns; ++i) + { + ColumnWithNameAndType & column = merged_block.getByPosition(i); + + /// Оставляем только состояния аггрегатных функций. + if (strncmp(column.type->getName().data(), "AggregateFunction", strlen("AggregateFunction")) != 0) + continue; + + /// Входят ли в PK? + SortDescription::const_iterator it = description.begin(); + for (; it != description.end(); ++it) + if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i)) + break; + + if (it != description.end()) + continue; + + column_numbers_to_aggregate.push_back(i); + column_to_aggregate.push_back(dynamic_cast(merged_columns[i])); + } + } + + if (has_collation) + merge(merged_block, merged_columns, queue_with_collation); + else + merge(merged_block, merged_columns, queue); + + return merged_block; +} + + +template +void AggregatingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue) +{ + size_t merged_rows = 0; + + /// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size + while (!queue.empty()) + { + TSortCursor current = queue.top(); + + setPrimaryKey(next_key, current); + + /// если накопилось достаточно строк и последняя посчитана полностью + if (next_key != current_key && merged_rows >= max_block_size) + return; + + queue.pop(); + + if (next_key != current_key) + { + current_key = std::move(next_key); + next_key.resize(description.size()); + + ++merged_rows; + /// Запишем данные для очередной группы. + setRow(current_row, current); + insertCurrentRow(merged_columns); + } + else + { + addRow(current); + } + + if (!current->isLast()) + { + current->next(); + queue.push(current); + } + else + { + /// Достаём из соответствующего источника следующий блок, если есть. + fetchNextBlock(current, queue); + } + } + + children.clear(); +} + +} diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 2382940fa21..8da4bca0155 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -530,8 +530,10 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi try { for (size_t i = 0; i < aggregates_size; ++i) - { is_final[i] = final && aggregate_functions[i]->canBeFinal(); + + for (size_t i = 0; i < aggregates_size; ++i) + { if (!is_final[i]) { /// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций. diff --git a/dbms/src/Storages/ITableDeclaration.cpp b/dbms/src/Storages/ITableDeclaration.cpp index 8df9b195b73..385f246ab17 100644 --- a/dbms/src/Storages/ITableDeclaration.cpp +++ b/dbms/src/Storages/ITableDeclaration.cpp @@ -37,7 +37,7 @@ NameAndTypePair ITableDeclaration::getRealColumn(const String & column_name) con for (auto & it : real_columns) if (it.first == column_name) return it; - throw Exception("There is no column " + column_name + " in table " + getTableName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); } @@ -60,7 +60,7 @@ const DataTypePtr ITableDeclaration::getDataTypeByName(const String & column_nam if (it->first == column_name) return it->second; - throw Exception("There is no column " + column_name + " in table " + getTableName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); } @@ -115,8 +115,7 @@ void ITableDeclaration::check(const Names & column_names) const const NamesAndTypesList & available_columns = getColumnsList(); if (column_names.empty()) - throw Exception("Empty list of columns queried for table " + getTableName() - + ". There are columns: " + listOfColumns(available_columns), + throw Exception("Empty list of columns queried. There are columns: " + listOfColumns(available_columns), ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED); const NamesAndTypesMap & columns_map = getColumnsMap(available_columns); @@ -128,12 +127,11 @@ void ITableDeclaration::check(const Names & column_names) const for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it) { if (columns_map.end() == columns_map.find(*it)) - throw Exception("There is no column with name " + *it + " in table " + getTableName() - + ". There are columns: " + listOfColumns(available_columns), + throw Exception("There is no column with name " + *it + " in table. There are columns: " + listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); if (unique_names.end() != unique_names.find(*it)) - throw Exception("Column " + *it + " queried more than once in table " + getTableName(), + throw Exception("Column " + *it + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE); unique_names.insert(*it); } @@ -160,14 +158,12 @@ void ITableDeclaration::check(const Block & block, bool need_all) const NamesAndTypesMap::const_iterator it = columns_map.find(column.name); if (columns_map.end() == it) - throw Exception("There is no column with name " + column.name + " in table " + getTableName() - + ". There are columns: " + listOfColumns(available_columns), - ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + throw Exception("There is no column with name " + column.name + ". There are columns: " + + listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); if (column.type->getName() != it->second->getName()) - throw Exception("Type mismatch for column " + column.name + " in table " + getTableName() - + ". Column has type " + it->second->getName() + ", got type " + column.type->getName(), - ErrorCodes::TYPE_MISMATCH); + throw Exception("Type mismatch for column " + column.name + ". Column has type " + + it->second->getName() + ", got type " + column.type->getName(), ErrorCodes::TYPE_MISMATCH); } if (need_all && names_in_block.size() < columns_map.size()) diff --git a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp index d5ca3746dd0..54c471f5497 100644 --- a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp +++ b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp @@ -41,7 +41,7 @@ void ActiveDataPartSet::add(const String & name) parts.insert(part); } -String ActiveDataPartSet::getContainingPart(const String & part_name) +String ActiveDataPartSet::getContainingPart(const String & part_name) const { Poco::ScopedLock lock(mutex); @@ -69,6 +69,19 @@ String ActiveDataPartSet::getContainingPart(const String & part_name) return ""; } +Strings ActiveDataPartSet::getParts() const +{ + Poco::ScopedLock lock(mutex); + + Strings res; + for (const Part & part : parts) + { + res.push_back(part.name); + } + + return res; +} + String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 081cbab36d4..3991c57cab5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -678,6 +678,20 @@ size_t MergeTreeData::getMaxPartsCountForMonth() return res; } +void MergeTreeData::delayInsertIfNeeded() +{ + size_t parts_count = getMaxPartsCountForMonth(); + if (parts_count > settings.parts_to_delay_insert) + { + double delay = std::pow(settings.insert_delay_step, parts_count - settings.parts_to_delay_insert); + delay /= 1000; + delay = std::min(delay, 5 * 60.); /// Ограничим задержку 5 минутами. + LOG_INFO(log, "Delaying inserting block by " + << std::fixed << std::setprecision(4) << delay << "s because there are " << parts_count << " parts"); + std::this_thread::sleep_for(std::chrono::duration(delay)); + } +} + MergeTreeData::DataPartPtr MergeTreeData::getContainingPart(const String & part_name, bool including_inactive) { MutableDataPartPtr tmp_part(new DataPart(*this)); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 8c9690aa7d7..acd8827c88d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -291,6 +292,10 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData:: merged_stream = new SummingSortedBlockInputStream(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; + case MergeTreeData::Aggregating: + merged_stream = new AggregatingSortedBlockInputStream(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + break; + default: throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index f19cb113dbb..7c5ca66eed2 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -55,9 +55,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( if (!zookeeper->exists(zookeeper_path)) createTable(); - if (!isTableEmpty()) - throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE); - checkTableStructure(); createReplica(); } @@ -67,6 +64,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( checkParts(); } + initVirtualParts(); loadQueue(); String unreplicated_path = full_path + "unreplicated/"; @@ -128,6 +126,8 @@ static String formattedAST(const ASTPtr & ast) void StorageReplicatedMergeTree::createTable() { + LOG_DEBUG(log, "Creating table " << zookeeper_path); + zookeeper->create(zookeeper_path, "", zkutil::CreateMode::Persistent); /// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы. @@ -196,12 +196,107 @@ void StorageReplicatedMergeTree::checkTableStructure() void StorageReplicatedMergeTree::createReplica() { + LOG_DEBUG(log, "Creating replica " << replica_path); + + /** Запомним список других реплик. + * NOTE: Здесь есть race condition. Если почти одновременно добавить нескольких реплик, сразу же начиная в них писать, + * небольшая часть данных может не реплицироваться. + */ + Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); + + /// Создадим пустую реплику. zookeeper->create(replica_path, "", zkutil::CreateMode::Persistent); zookeeper->create(replica_path + "/host", "", zkutil::CreateMode::Persistent); zookeeper->create(replica_path + "/log", "", zkutil::CreateMode::Persistent); zookeeper->create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent); zookeeper->create(replica_path + "/queue", "", zkutil::CreateMode::Persistent); zookeeper->create(replica_path + "/parts", "", zkutil::CreateMode::Persistent); + + /// Если таблица пуста, больше ничего делать не нужно. + if (replicas.empty()) + { + LOG_DEBUG(log, "No other replicas"); + return; + } + + /// "Эталонная" реплика, у которой мы возьмем информацию о множестве кусков, очередь и указатели на логи. + String source_replica = replicas[0]; + + /** Дождемся, пока все активные реплики заметят появление этой реплики. + * Это не даст им удалять записи из своих логов, пока эта реплика их не скопирует. + */ + for (const String & replica : replicas) + { + LOG_DEBUG(log, "Waiting for " << replica << " to acknowledge me"); + + bool active = true; + while(true) + { + if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) + { + active = false; + break; + } + if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name)) + break; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + /// Будем предпочитать активную реплику в качестве эталонной. + if (active) + source_replica = replica; + } + + LOG_INFO(log, "Will mimic " << source_replica); + + String source_path = zookeeper_path + "/replicas/" + source_replica; + + /// Порядок следующих трех действий важен. Записи в логе могут продублироваться, но не могут потеряться. + + /// Скопируем у эталонной реплики ссылки на все логи. + for (const String & replica : replicas) + { + String pointer = zookeeper->get(source_path + "/log_pointers/" + replica); + zookeeper->create(replica_path + "/log_pointers/" + replica, pointer, zkutil::CreateMode::Persistent); + } + + /// Запомним очередь эталонной реплики. + Strings source_queue_names = zookeeper->getChildren(source_path + "/queue"); + std::sort(source_queue_names.begin(), source_queue_names.end()); + Strings source_queue; + for (const String & entry_name : source_queue_names) + { + String entry; + if (!zookeeper->tryGet(source_path + "/queue/" + entry_name, entry)) + continue; + source_queue.push_back(entry); + } + + /// Добавим в очередь задания на получение всех активных кусков, которые есть у эталонной реплики. + Strings parts = zookeeper->getChildren(source_path + "/parts"); + ActiveDataPartSet active_parts_set; + for (const String & part : parts) + { + active_parts_set.add(part); + } + Strings active_parts = active_parts_set.getParts(); + for (const String & name : active_parts) + { + LogEntry log_entry; + log_entry.type = LogEntry::GET_PART; + log_entry.source_replica = ""; + log_entry.new_part_name = name; + + zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential); + } + LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched"); + + /// Добавим в очередь содержимое очереди эталонной реплики. + for (const String & entry : source_queue) + { + zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential); + } + LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries"); } void StorageReplicatedMergeTree::activateReplica() @@ -240,17 +335,6 @@ void StorageReplicatedMergeTree::activateReplica() replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", *zookeeper); } -bool StorageReplicatedMergeTree::isTableEmpty() -{ - Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); - for (const auto & replica : replicas) - { - if (!zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty()) - return false; - } - return true; -} - void StorageReplicatedMergeTree::checkParts() { Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts"); @@ -361,7 +445,7 @@ void StorageReplicatedMergeTree::checkParts() LogEntry log_entry; log_entry.type = LogEntry::GET_PART; - log_entry.source_replica = replica_name; + log_entry.source_replica = ""; log_entry.new_part_name = name; /// Полагаемся, что это происходит до загрузки очереди (loadQueue). @@ -381,6 +465,15 @@ void StorageReplicatedMergeTree::checkParts() } } +void StorageReplicatedMergeTree::initVirtualParts() +{ + auto parts = data.getDataParts(); + for (const auto & part : parts) + { + virtual_parts.add(part->name); + } +} + void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops) { String another_replica = findReplicaHavingPart(part->name, false); @@ -503,7 +596,7 @@ void StorageReplicatedMergeTree::loadQueue() String s = zookeeper->get(replica_path + "/queue/" + child); LogEntry entry = LogEntry::parse(s); entry.znode_name = child; - entry.tagPartsAsCurrentlyMerging(*this); + entry.addResultToVirtualParts(*this); queue.push_back(entry); } } @@ -592,7 +685,7 @@ void StorageReplicatedMergeTree::pullLogsToQueue() String path_created = dynamic_cast((*results)[0]).getPathCreated(); entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); - entry.tagPartsAsCurrentlyMerging(*this); + entry.addResultToVirtualParts(*this); queue.push_back(entry); ++iterator.index; @@ -863,7 +956,6 @@ void StorageReplicatedMergeTree::queueThread() if (success) { - entry.currently_merging_tagger = nullptr; std::this_thread::sleep_for(QUEUE_AFTER_WORK_SLEEP); } else @@ -874,7 +966,6 @@ void StorageReplicatedMergeTree::queueThread() Poco::ScopedLock lock(queue_mutex); queue.push_back(entry); } - entry.currently_merging_tagger = nullptr; std::this_thread::sleep_for(QUEUE_ERROR_SLEEP); } } @@ -891,13 +982,35 @@ void StorageReplicatedMergeTree::mergeSelectingThread() try { size_t merges_queued = 0; + /// Есть ли в очереди мердж крупных кусков. + /// TODO: Если мердж уже выполняется, его нет в очереди, но здесь нужно все равно как-то о нем узнать. + bool has_big_merge = false; { Poco::ScopedLock lock(queue_mutex); for (const auto & entry : queue) + { if (entry.type == LogEntry::MERGE_PARTS) + { ++merges_queued; + + if (!has_big_merge) + { + for (const String & name : entry.parts_to_merge) + { + MergeTreeData::DataPartPtr part = data.getContainingPart(name); + if (!part || part->name != name) + continue; + if (part->size * data.index_granularity > 25 * 1024 * 1024) + { + has_big_merge = true; + break; + } + } + } + } + } } if (merges_queued >= data.settings.merging_threads) @@ -906,35 +1019,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread() continue; } - /// Есть ли активный мердж крупных кусков. - bool has_big_merge = false; - - { - Poco::ScopedLock lock(currently_merging_mutex); - - for (const auto & name : currently_merging) - { - MergeTreeData::DataPartPtr part = data.getContainingPart(name); - if (!part) - continue; - if (part->name != name) - { - LOG_INFO(log, "currently_merging contains obsolete part " << name << " contained in " << part->name); - continue; - } - if (part->size * data.index_granularity > 25 * 1024 * 1024) - { - has_big_merge = true; - break; - } - } - } - MergeTreeData::DataPartsVector parts; { - Poco::ScopedLock lock(currently_merging_mutex); - String merged_name; auto can_merge = std::bind( &StorageReplicatedMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2); @@ -963,7 +1050,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() if (success) { /// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния. - /// (чтобы куски пометились как currently_merging). + /// (чтобы кусок добавился в virtual_parts). pullLogsToQueue(); String month_name = parts[0]->name.substr(0, 6); @@ -1021,7 +1108,9 @@ void StorageReplicatedMergeTree::clearOldBlocksThread() bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) { - if (currently_merging.count(left->name) || currently_merging.count(right->name)) + /// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать. + if (virtual_parts.getContainingPart(left->name) != left->name || + virtual_parts.getContainingPart(right->name) != right->name) return false; String month_name = left->name.substr(0, 6); @@ -1109,7 +1198,11 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin void StorageReplicatedMergeTree::shutdown() { if (permanent_shutdown_called) + { + if (restarting_thread.joinable()) + restarting_thread.join(); return; + } permanent_shutdown_called = true; restarting_thread.join(); } @@ -1152,9 +1245,24 @@ void StorageReplicatedMergeTree::goReadOnly() leader_election = nullptr; replica_is_active_node = nullptr; merger.cancelAll(); - is_leader_node = false; endpoint_holder = nullptr; + + LOG_TRACE(log, "Waiting for threads to finish"); + if (is_leader_node) + { + is_leader_node = false; + if (merge_selecting_thread.joinable()) + merge_selecting_thread.join(); + if (clear_old_blocks_thread.joinable()) + clear_old_blocks_thread.join(); + } + if (queue_updating_thread.joinable()) + queue_updating_thread.join(); + for (auto & thread : queue_threads) + thread.join(); + queue_threads.clear(); + LOG_TRACE(log, "Threads finished"); } void StorageReplicatedMergeTree::startup()