diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index 9b955adbbe6..099ffe73938 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -47,6 +47,7 @@ class Macros; struct Progress; class Clusters; class QueryLog; +class PartLog; struct MergeTreeSettings; class IDatabase; class DDLGuard; @@ -282,6 +283,7 @@ public: Compiler & getCompiler(); QueryLog & getQueryLog(); + PartLog * getPartLog(); const MergeTreeSettings & getMergeTreeSettings(); /// Prevents DROP TABLE if its size is greater than max_size (50GB by default, max_size=0 turn off this check) diff --git a/dbms/include/DB/Interpreters/PartLog.h b/dbms/include/DB/Interpreters/PartLog.h new file mode 100644 index 00000000000..ede5cf198b9 --- /dev/null +++ b/dbms/include/DB/Interpreters/PartLog.h @@ -0,0 +1,44 @@ +#pragma once + +#include + + +namespace DB +{ + +struct PartLogElement +{ + enum Type + { + NEW_PART = 1, + MERGE_PARTS = 2, + DOWNLOAD_PART = 3, + }; + + Type event_type = NEW_PART; + + time_t event_time{}; + + UInt64 size_in_bytes{}; + UInt64 duration_ms{}; + + String database_name; + String table_name; + String part_name; + Strings merged_from; + + static std::string name() { return "PartLog"; } + + static Block createBlock(); + void appendToBlock(Block & block) const; + +}; + + +/// Instead of typedef - to allow forward declaration. +class PartLog : public SystemLog +{ + using SystemLog::SystemLog; +}; + +} diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index c5880f6b157..2bf03c34ae8 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -34,6 +34,7 @@ namespace ErrorCodes extern const int TABLE_DIFFERS_TOO_MUCH; } + /// Data structure for *MergeTree engines. /// Merge tree is used for incremental sorting of data. /// The table consists of several sorted parts. @@ -77,7 +78,7 @@ namespace ErrorCodes /// - MergeTreeDataWriter /// - MergeTreeDataMerger - class MergeTreeData : public ITableDeclaration +class MergeTreeData : public ITableDeclaration { friend class ReshardingWorker; @@ -229,7 +230,8 @@ public: /// index_granularity - how many rows correspond to one primary key value. /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory. /// attach - whether the existing table is attached or the new table is created. - MergeTreeData( const String & full_path_, NamesAndTypesListPtr columns_, + MergeTreeData( const String & database_, const String & table_, + const String & full_path_, NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns_, const ColumnDefaults & column_defaults_, @@ -261,11 +263,6 @@ public: Int64 getMaxDataPartIndex(); - std::string getTableName() const override - { - throw Exception("Logical error: calling method getTableName of not a table.", ErrorCodes::LOGICAL_ERROR); - } - const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } NameAndTypePair getColumn(const String & column_name) const override @@ -288,6 +285,10 @@ public: || column_name == "_sample_factor"; } + String getDatabaseName() const { return database_name; } + + String getTableName() const override { return table_name; } + String getFullPath() const { return full_path; } String getLogName() const { return log_name; } @@ -482,6 +483,8 @@ private: ExpressionActionsPtr primary_expr; SortDescription sort_descr; + String database_name; + String table_name; String full_path; NamesAndTypesListPtr columns; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataWriter.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataWriter.h index 2c5313ea322..995e5f2c5b3 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataWriter.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataWriter.h @@ -6,6 +6,7 @@ #include #include +#include #include @@ -40,7 +41,7 @@ using BlocksWithDateIntervals = std::list; class MergeTreeDataWriter { public: - MergeTreeDataWriter(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (Writer)")) {} + MergeTreeDataWriter(MergeTreeData & data_, Context & context_) : data(data_), context(context_), log(&Logger::get(data.getLogName() + " (Writer)")) {} /** Split the block to blocks, each of them must be written as separate part. * (split rows by months) @@ -56,6 +57,7 @@ public: private: MergeTreeData & data; + Context & context; Logger * log; }; diff --git a/dbms/scripts/merge_algorithm/add_parts.sh b/dbms/scripts/merge_algorithm/add_parts.sh new file mode 100644 index 00000000000..e514428f3f4 --- /dev/null +++ b/dbms/scripts/merge_algorithm/add_parts.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +for (( i = 0; i < 1000; i++ )); do + if (( RANDOM % 10 )); then + clickhouse-client --port=9007 --query="INSERT INTO mt (x) SELECT rand64() AS x FROM system.numbers LIMIT 100000" + else + clickhouse-client --port=9007 --query="INSERT INTO mt (x) SELECT rand64() AS x FROM system.numbers LIMIT 300000" + fi + +done diff --git a/dbms/scripts/merge_algorithm/drawer.py b/dbms/scripts/merge_algorithm/drawer.py new file mode 100644 index 00000000000..bca31a7c92a --- /dev/null +++ b/dbms/scripts/merge_algorithm/drawer.py @@ -0,0 +1,76 @@ +from __future__ import print_function + +import argparse +import matplotlib.pyplot as plt +import ast + +TMP_FILE='tmp.tsv' + +def parse_args(): + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('-f', '--file', default='data.tsv') + cfg = parser.parse_args() + return cfg + +def draw(): + place = dict() + max_coord = 0 + global_top = 0 + for line in open(TMP_FILE): + numbers = line.split('\t') + if len(numbers) <= 2: + continue + name = numbers[-2] + if numbers[0] == '1': + dx = int(numbers[3]) + max_coord += dx + place[name] = [1, max_coord, 1, dx] + max_coord += dx + plt.plot([max_coord - 2 * dx, max_coord], [1, 1]) + for line in open(TMP_FILE): + numbers = line.split('\t') + if len(numbers) <= 2: + continue + name = numbers[-2] + if numbers[0] == '2': + list = ast.literal_eval(numbers[-1]) + coord = [0,0,0,0] + for cur_name in list: + coord[0] = max(place[cur_name][0], coord[0]) + coord[1] += place[cur_name][1] * place[cur_name][2] + coord[2] += place[cur_name][2] + coord[3] += place[cur_name][3] + coord[1] /= coord[2] + coord[0] += 1 + global_top = max(global_top, coord[0]) + place[name] = coord + for cur_name in list: + plt.plot([coord[1], place[cur_name][1]],[coord[0], place[cur_name][0]]) + plt.plot([coord[1] - coord[3], coord[1] + coord[3]], [coord[0], coord[0]]) + plt.plot([0], [global_top + 1]) + plt.plot([0], [-1]) + plt.show() + + +def convert(input_file): + print(input_file) + tmp_file = open(TMP_FILE, "w") + for line in open(input_file): + numbers = line.split('\t') + numbers2 = numbers[-2].split('_') + if numbers2[-2] == numbers2[-3]: + numbers2[-2] = str(int(numbers2[-2]) + 1) + numbers2[-3] = str(int(numbers2[-3]) + 1) + numbers[-2] = '_'.join(numbers2[1:]) + print('\t'.join(numbers), end='', file=tmp_file) + else: + print(line, end='', file=tmp_file) + +def main(): + cfg = parse_args() + convert(cfg.file) + draw() + +if __name__ == '__main__': + main() + diff --git a/dbms/scripts/merge_algorithm/stats.py b/dbms/scripts/merge_algorithm/stats.py new file mode 100644 index 00000000000..bfda20b5d7f --- /dev/null +++ b/dbms/scripts/merge_algorithm/stats.py @@ -0,0 +1,61 @@ +import time +import ast +from datetime import datetime + +FILE='data.tsv' + +def get_metrix(): + data = [] + time_to_merge = 0 + count_of_parts = 0 + max_count_of_parts = 0 + parts_in_time = [] + last_date = 0 + for line in open(FILE): + fields = line.split('\t') + last_date = datetime.strptime(fields[2], '%Y-%m-%d %H:%M:%S') + break + + for line in open(FILE): + fields = line.split('\t') + cur_date = datetime.strptime(fields[2], '%Y-%m-%d %H:%M:%S') + if fields[0] == '2': + time_to_merge += int(fields[4]) + list = ast.literal_eval(fields[-1]) + count_of_parts -= len(list) - 1 + else: + count_of_parts += 1 + + if max_count_of_parts < count_of_parts: + max_count_of_parts = count_of_parts + + parts_in_time.append([(cur_date-last_date).total_seconds(), count_of_parts]) + last_date = cur_date + + stats_parts_in_time = [] + global_time = 0 + average_parts = 0 + for i in range(max_count_of_parts + 1): + stats_parts_in_time.append(0) + + for elem in parts_in_time: + stats_parts_in_time[elem[1]] += elem[0] + global_time += elem[0] + average_parts += elem[0] * elem[1] + + for i in range(max_count_of_parts): + stats_parts_in_time[i] /= global_time + average_parts /= global_time + + return time_to_merge, max_count_of_parts, average_parts, stats_parts_in_time + +def main(): + time_to_merge, max_parts, average_parts, stats_parts = get_metrix() + print('time_to_merge=', time_to_merge) + print('max_parts=', max_parts) + print('average_parts=', average_parts) + print('stats_parts=', stats_parts) + + +if __name__ == '__main__': + main() diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 5d51dad1a56..57960a98d37 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -95,8 +96,8 @@ struct ContextShared mutable zkutil::ZooKeeperPtr zookeeper; /// Клиент для ZooKeeper. - String interserver_io_host; /// Имя хоста по которым это сервер доступен для других серверов. - int interserver_io_port; /// и порт, + String interserver_io_host; /// Имя хоста по которым это сервер доступен для других серверов. + int interserver_io_port; /// и порт, String path; /// Путь к директории с данными, со слешем на конце. String tmp_path; /// Путь ко временным файлам, возникающим при обработке запроса. @@ -121,6 +122,7 @@ struct ContextShared Macros macros; /// Substitutions extracted from config. std::unique_ptr compiler; /// Used for dynamic compilation of queries' parts if it necessary. std::unique_ptr query_log; /// Used to log queries. + std::unique_ptr part_log; /// Used to log operations with parts /// Правила для выбора метода сжатия в зависимости от размера куска. mutable std::unique_ptr compression_method_selector; std::unique_ptr merge_tree_settings; /// Settings of MergeTree* engines. @@ -1061,6 +1063,34 @@ QueryLog & Context::getQueryLog() } +PartLog * Context::getPartLog() +{ + auto lock = getLock(); + + auto & config = Poco::Util::Application::instance().config(); + if (!config.has("part_log")) + return nullptr; + + if (!shared->part_log) + { + if (shared->shutdown_called) + throw Exception("Will not get part_log because shutdown was called", ErrorCodes::LOGICAL_ERROR); + + if (!global_context) + throw Exception("Logical error: no global context for part log", ErrorCodes::LOGICAL_ERROR); + + String database = config.getString("part_log.database", "system"); + String table = config.getString("part_log.table", "part_log"); + size_t flush_interval_milliseconds = parse( + config.getString("part_log.flush_interval_milliseconds", DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS_STR)); + shared->part_log = std::make_unique( + *global_context, database, table, "MergeTree(event_date, event_time, 1024)", flush_interval_milliseconds); + } + + return shared->part_log.get(); +} + + CompressionMethod Context::chooseCompressionMethod(size_t part_size, double part_size_ratio) const { auto lock = getLock(); diff --git a/dbms/src/Interpreters/PartLog.cpp b/dbms/src/Interpreters/PartLog.cpp new file mode 100644 index 00000000000..7b025bbfd7e --- /dev/null +++ b/dbms/src/Interpreters/PartLog.cpp @@ -0,0 +1,63 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +Block PartLogElement::createBlock() +{ + return + { + {std::make_shared(), std::make_shared(), "event_type"}, + + {std::make_shared(), std::make_shared(), "event_date"}, + {std::make_shared(), std::make_shared(), "event_time"}, + + {std::make_shared(), std::make_shared(), "size_in_bytes"}, + {std::make_shared(), std::make_shared(), "duration_ms"}, + + {std::make_shared(), std::make_shared(), "database"}, + {std::make_shared(), std::make_shared(), "table"}, + {std::make_shared(), std::make_shared(), "part_name"}, + {std::make_shared(std::make_shared()), + std::make_shared(std::make_shared()), "merged_from"}, + }; +} + + +void PartLogElement::appendToBlock(Block & block) const +{ + size_t i = 0; + + block.getByPosition(i++).column->insert(UInt64(event_type)); + block.getByPosition(i++).column->insert(UInt64(DateLUT::instance().toDayNum(event_time))); + block.getByPosition(i++).column->insert(UInt64(event_time)); + + block.getByPosition(i++).column->insert(UInt64(size_in_bytes)); + block.getByPosition(i++).column->insert(UInt64(duration_ms)); + + block.getByPosition(i++).column->insert(database_name); + block.getByPosition(i++).column->insert(table_name); + block.getByPosition(i++).column->insert(part_name); + + Array merged_from_array; + merged_from_array.reserve(merged_from.size()); + for (const auto & name : merged_from) + merged_from_array.push_back(name); + block.getByPosition(i++).column->insert(merged_from_array); +} + + + +} diff --git a/dbms/src/Server/config.xml b/dbms/src/Server/config.xml index e95988188a4..7f2a97be00a 100644 --- a/dbms/src/Server/config.xml +++ b/dbms/src/Server/config.xml @@ -135,6 +135,16 @@ + + + diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 85083543d68..145cf5a176d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -57,6 +58,7 @@ namespace ErrorCodes MergeTreeData::MergeTreeData( + const String & database_, const String & table_, const String & full_path_, NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns_, @@ -71,12 +73,13 @@ MergeTreeData::MergeTreeData( bool require_part_metadata_, bool attach, BrokenPartCallback broken_part_callback_) - : ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_), + : ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_), date_column_name(date_column_name_), sampling_expression(sampling_expression_), index_granularity(index_granularity_), merging_params(merging_params_), settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr), require_part_metadata(require_part_metadata_), + database_name(database_), table_name(table_), full_path(full_path_), columns(columns_), broken_part_callback(broken_part_callback_), log_name(log_name_), log(&Logger::get(log_name + " (Data)")) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 07b211b14cf..413ce1d630b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -3,6 +3,9 @@ #include #include #include +#include +#include +#include #include @@ -85,6 +88,11 @@ BlocksWithDateIntervals MergeTreeDataWriter::splitBlockIntoParts(const Block & b MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDateInterval & block_with_dates, Int64 temp_index) { + /// For logging + Stopwatch stopwatch; + PartLogElement elem; + elem.event_time = time(0); + Block & block = block_with_dates.block; UInt16 min_date = block_with_dates.min_date; UInt16 max_date = block_with_dates.max_date; @@ -157,6 +165,20 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->size_in_bytes); + PartLog * part_log = context.getPartLog(); + if (part_log) + { + elem.event_type = PartLogElement::NEW_PART; + elem.size_in_bytes = new_data_part->size_in_bytes; + elem.duration_ms = stopwatch.elapsed() / 1000000; + + elem.database_name = new_data_part->storage.getDatabaseName(); + elem.table_name = new_data_part->storage.getTableName(); + elem.part_name = new_data_part->name; + + part_log->add(elem); + } + return new_data_part; } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 6d430b9c9d5..ae5fee99a0f 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -43,15 +44,16 @@ StorageMergeTree::StorageMergeTree( const MergeTreeData::MergingParams & merging_params_, bool has_force_restore_data_flag, const MergeTreeSettings & settings_) - : IStorage{materialized_columns_, alias_columns_, column_defaults_}, + : IStorage{materialized_columns_, alias_columns_, column_defaults_}, path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'), context(context_), background_pool(context_.getBackgroundPool()), - data(full_path, columns_, + data(database_name, table_name, + full_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, merging_params_, settings_, database_name_ + "." + table_name, false, attach), - reader(data), writer(data), merger(data, context.getBackgroundPool()), + reader(data), writer(data, context), merger(data, context.getBackgroundPool()), log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)")) { data.loadDataParts(has_force_restore_data_flag); @@ -333,11 +335,35 @@ bool StorageMergeTree::merge( MergeList::EntryPtr merge_entry_ptr = context.getMergeList().insert(database_name, table_name, merged_name, merging_tagger->parts); + /// Logging + PartLogElement elem; + Stopwatch stopwatch; + elem.event_time = time(0); + + elem.merged_from.reserve(merging_tagger->parts.size()); + for (const auto & part : merging_tagger->parts) + elem.merged_from.push_back(part->name); + auto new_part = merger.mergePartsToTemporaryPart( merging_tagger->parts, merged_name, *merge_entry_ptr, aio_threshold, time(0), merging_tagger->reserved_space.get()); merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr); + PartLog * part_log = context.getPartLog(); + if (part_log) + { + elem.event_type = PartLogElement::MERGE_PARTS; + elem.size_in_bytes = new_part->size_in_bytes; + + elem.database_name = new_part->storage.getDatabaseName(); + elem.table_name = new_part->storage.getTableName(); + elem.part_name = new_part->name; + + elem.duration_ms = stopwatch.elapsed() / 1000000; + + part_log->add(elem); + } + return true; } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 9cedb0d293f..8fef7bb331f 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -207,18 +208,19 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( const MergeTreeData::MergingParams & merging_params_, bool has_force_restore_data_flag, const MergeTreeSettings & settings_) - : IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_), + : IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_), current_zookeeper(context.getZooKeeper()), database_name(database_name_), table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(context.getMacros().expand(zookeeper_path_)), replica_name(context.getMacros().expand(replica_name_)), - data(full_path, columns_, + data(database_name, table_name, + full_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, merging_params_, settings_, database_name_ + "." + table_name, true, attach, [this] (const std::string & name) { enqueuePartForCheck(name); }), - reader(data), writer(data), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this), + reader(data), writer(data, context), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this), shutdown_event(false), part_check_thread(*this), log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)")) { @@ -291,7 +293,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( String unreplicated_path = full_path + "unreplicated/"; if (Poco::File(unreplicated_path).exists()) { - unreplicated_data = std::make_unique(unreplicated_path, columns_, + unreplicated_data = std::make_unique( + database_name, table_name, + unreplicated_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, merging_params_, settings_, @@ -507,7 +511,7 @@ namespace in >> read_primary_key; /// NOTE: Можно сделать менее строгую проверку совпадения выражений, чтобы таблицы не ломались от небольших изменений - /// в коде formatAST. + /// в коде formatAST. if (read_primary_key != local_primary_key) throw Exception("Existing table metadata in ZooKeeper differs in primary key." " Stored in ZooKeeper: " + read_primary_key + ", local: " + local_primary_key, @@ -710,9 +714,9 @@ void StorageReplicatedMergeTree::createReplica() /** Если эталонная реплика еще не до конца создана, подождем. * NOTE: Если при ее создании что-то пошло не так, можем провисеть тут вечно. - * Можно создавать на время создания эфемерную ноду, чтобы быть уверенным, что реплика создается, а не заброшена. - * То же можно делать и для таблицы. Можно автоматически удалять ноду реплики/таблицы, - * если видно, что она создана не до конца, а создающий ее умер. + * Можно создавать на время создания эфемерную ноду, чтобы быть уверенным, что реплика создается, а не заброшена. + * То же можно делать и для таблицы. Можно автоматически удалять ноду реплики/таблицы, + * если видно, что она создана не до конца, а создающий ее умер. */ while (!zookeeper->exists(source_path + "/columns")) { @@ -1667,7 +1671,7 @@ bool StorageReplicatedMergeTree::canMergeParts( } else { - String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number); + String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number); String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number); if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED && @@ -1802,7 +1806,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts( /// Уберем больше не нужные отметки о несуществующих блоках. for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, parts[i]->right + 1); number <= parts[i + 1]->left - 1; ++number) { - zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number)); + zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number)); zookeeper->tryRemove(zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number)); } } @@ -2035,9 +2039,27 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host")); + Stopwatch stopwatch; + PartLogElement elem; + elem.event_time = time(0); + MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart( part_name, replica_path, address.host, address.replication_port, to_detached); + PartLog * part_log = context.getPartLog(); + if (part_log) + { + elem.event_type = PartLogElement::DOWNLOAD_PART; + elem.size_in_bytes = part->size_in_bytes; + elem.duration_ms = stopwatch.elapsed() / 10000000; + + elem.database_name = part->storage.getDatabaseName(); + elem.table_name = part->storage.getTableName(); + elem.part_name = part->name; + + part_log->add(elem); + } + if (!to_detached) { zkutil::Ops ops; @@ -2642,7 +2664,7 @@ void StorageReplicatedMergeTree::dropPartition( * Это запретит мерджи удаляемых кусков с новыми вставляемыми данными. * Инвариант: в логе не появятся слияния удаляемых кусков с другими кусками. * NOTE: Если понадобится аналогично поддержать запрос DROP PART, для него придется придумать какой-нибудь новый механизм, - * чтобы гарантировать этот инвариант. + * чтобы гарантировать этот инвариант. */ Int64 right;