Added part_log with statistics scripts (#549)

* Added part_log

* first_test

* filter and hits_res

* Add renamer and drawer

* Add columns database and table into PartLog

* Add normal way to get table_name and database_name from part

* improve drawer

* add stats for random size parts

* Merge converter and drawer

* make drawer more informative

* add new data

* add new data

* new data

* add long range stats

* for checking best way

* Add add_parts script

* Good style for global merge

* delete commented code

* Fixed spaces to tabs

* Note that Stopwatch is started automatically.

* Style

* Update StorageMergeTree.cpp

* Update StorageReplicatedMergeTree.cpp

* Switch act_time_ms to duration_ms

* Added ability to disable part_log

* fixed getPartLog

* fix usage getPartLog

* fix
This commit is contained in:
Ravengg 2017-03-07 21:13:54 +04:00 committed by alexey-milovidov
parent 293b8b958c
commit b079dacfd1
14 changed files with 399 additions and 25 deletions

View File

@ -47,6 +47,7 @@ class Macros;
struct Progress; struct Progress;
class Clusters; class Clusters;
class QueryLog; class QueryLog;
class PartLog;
struct MergeTreeSettings; struct MergeTreeSettings;
class IDatabase; class IDatabase;
class DDLGuard; class DDLGuard;
@ -282,6 +283,7 @@ public:
Compiler & getCompiler(); Compiler & getCompiler();
QueryLog & getQueryLog(); QueryLog & getQueryLog();
PartLog * getPartLog();
const MergeTreeSettings & getMergeTreeSettings(); const MergeTreeSettings & getMergeTreeSettings();
/// Prevents DROP TABLE if its size is greater than max_size (50GB by default, max_size=0 turn off this check) /// Prevents DROP TABLE if its size is greater than max_size (50GB by default, max_size=0 turn off this check)

View File

@ -0,0 +1,44 @@
#pragma once
#include <DB/Interpreters/SystemLog.h>
namespace DB
{
struct PartLogElement
{
enum Type
{
NEW_PART = 1,
MERGE_PARTS = 2,
DOWNLOAD_PART = 3,
};
Type event_type = NEW_PART;
time_t event_time{};
UInt64 size_in_bytes{};
UInt64 duration_ms{};
String database_name;
String table_name;
String part_name;
Strings merged_from;
static std::string name() { return "PartLog"; }
static Block createBlock();
void appendToBlock(Block & block) const;
};
/// Instead of typedef - to allow forward declaration.
class PartLog : public SystemLog<PartLogElement>
{
using SystemLog<PartLogElement>::SystemLog;
};
}

View File

@ -34,6 +34,7 @@ namespace ErrorCodes
extern const int TABLE_DIFFERS_TOO_MUCH; extern const int TABLE_DIFFERS_TOO_MUCH;
} }
/// Data structure for *MergeTree engines. /// Data structure for *MergeTree engines.
/// Merge tree is used for incremental sorting of data. /// Merge tree is used for incremental sorting of data.
/// The table consists of several sorted parts. /// The table consists of several sorted parts.
@ -77,7 +78,7 @@ namespace ErrorCodes
/// - MergeTreeDataWriter /// - MergeTreeDataWriter
/// - MergeTreeDataMerger /// - MergeTreeDataMerger
class MergeTreeData : public ITableDeclaration class MergeTreeData : public ITableDeclaration
{ {
friend class ReshardingWorker; friend class ReshardingWorker;
@ -229,7 +230,8 @@ public:
/// index_granularity - how many rows correspond to one primary key value. /// index_granularity - how many rows correspond to one primary key value.
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory. /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created. /// attach - whether the existing table is attached or the new table is created.
MergeTreeData( const String & full_path_, NamesAndTypesListPtr columns_, MergeTreeData( const String & database_, const String & table_,
const String & full_path_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_, const ColumnDefaults & column_defaults_,
@ -261,11 +263,6 @@ public:
Int64 getMaxDataPartIndex(); Int64 getMaxDataPartIndex();
std::string getTableName() const override
{
throw Exception("Logical error: calling method getTableName of not a table.", ErrorCodes::LOGICAL_ERROR);
}
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
NameAndTypePair getColumn(const String & column_name) const override NameAndTypePair getColumn(const String & column_name) const override
@ -288,6 +285,10 @@ public:
|| column_name == "_sample_factor"; || column_name == "_sample_factor";
} }
String getDatabaseName() const { return database_name; }
String getTableName() const override { return table_name; }
String getFullPath() const { return full_path; } String getFullPath() const { return full_path; }
String getLogName() const { return log_name; } String getLogName() const { return log_name; }
@ -482,6 +483,8 @@ private:
ExpressionActionsPtr primary_expr; ExpressionActionsPtr primary_expr;
SortDescription sort_descr; SortDescription sort_descr;
String database_name;
String table_name;
String full_path; String full_path;
NamesAndTypesListPtr columns; NamesAndTypesListPtr columns;

View File

@ -6,6 +6,7 @@
#include <DB/Columns/ColumnsNumber.h> #include <DB/Columns/ColumnsNumber.h>
#include <DB/Interpreters/sortBlock.h> #include <DB/Interpreters/sortBlock.h>
#include <DB/Interpreters/Context.h>
#include <DB/Storages/MergeTree/MergeTreeData.h> #include <DB/Storages/MergeTree/MergeTreeData.h>
@ -40,7 +41,7 @@ using BlocksWithDateIntervals = std::list<BlockWithDateInterval>;
class MergeTreeDataWriter class MergeTreeDataWriter
{ {
public: public:
MergeTreeDataWriter(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (Writer)")) {} MergeTreeDataWriter(MergeTreeData & data_, Context & context_) : data(data_), context(context_), log(&Logger::get(data.getLogName() + " (Writer)")) {}
/** Split the block to blocks, each of them must be written as separate part. /** Split the block to blocks, each of them must be written as separate part.
* (split rows by months) * (split rows by months)
@ -56,6 +57,7 @@ public:
private: private:
MergeTreeData & data; MergeTreeData & data;
Context & context;
Logger * log; Logger * log;
}; };

View File

@ -0,0 +1,10 @@
#!/bin/bash
for (( i = 0; i < 1000; i++ )); do
if (( RANDOM % 10 )); then
clickhouse-client --port=9007 --query="INSERT INTO mt (x) SELECT rand64() AS x FROM system.numbers LIMIT 100000"
else
clickhouse-client --port=9007 --query="INSERT INTO mt (x) SELECT rand64() AS x FROM system.numbers LIMIT 300000"
fi
done

View File

@ -0,0 +1,76 @@
from __future__ import print_function
import argparse
import matplotlib.pyplot as plt
import ast
TMP_FILE='tmp.tsv'
def parse_args():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-f', '--file', default='data.tsv')
cfg = parser.parse_args()
return cfg
def draw():
place = dict()
max_coord = 0
global_top = 0
for line in open(TMP_FILE):
numbers = line.split('\t')
if len(numbers) <= 2:
continue
name = numbers[-2]
if numbers[0] == '1':
dx = int(numbers[3])
max_coord += dx
place[name] = [1, max_coord, 1, dx]
max_coord += dx
plt.plot([max_coord - 2 * dx, max_coord], [1, 1])
for line in open(TMP_FILE):
numbers = line.split('\t')
if len(numbers) <= 2:
continue
name = numbers[-2]
if numbers[0] == '2':
list = ast.literal_eval(numbers[-1])
coord = [0,0,0,0]
for cur_name in list:
coord[0] = max(place[cur_name][0], coord[0])
coord[1] += place[cur_name][1] * place[cur_name][2]
coord[2] += place[cur_name][2]
coord[3] += place[cur_name][3]
coord[1] /= coord[2]
coord[0] += 1
global_top = max(global_top, coord[0])
place[name] = coord
for cur_name in list:
plt.plot([coord[1], place[cur_name][1]],[coord[0], place[cur_name][0]])
plt.plot([coord[1] - coord[3], coord[1] + coord[3]], [coord[0], coord[0]])
plt.plot([0], [global_top + 1])
plt.plot([0], [-1])
plt.show()
def convert(input_file):
print(input_file)
tmp_file = open(TMP_FILE, "w")
for line in open(input_file):
numbers = line.split('\t')
numbers2 = numbers[-2].split('_')
if numbers2[-2] == numbers2[-3]:
numbers2[-2] = str(int(numbers2[-2]) + 1)
numbers2[-3] = str(int(numbers2[-3]) + 1)
numbers[-2] = '_'.join(numbers2[1:])
print('\t'.join(numbers), end='', file=tmp_file)
else:
print(line, end='', file=tmp_file)
def main():
cfg = parse_args()
convert(cfg.file)
draw()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,61 @@
import time
import ast
from datetime import datetime
FILE='data.tsv'
def get_metrix():
data = []
time_to_merge = 0
count_of_parts = 0
max_count_of_parts = 0
parts_in_time = []
last_date = 0
for line in open(FILE):
fields = line.split('\t')
last_date = datetime.strptime(fields[2], '%Y-%m-%d %H:%M:%S')
break
for line in open(FILE):
fields = line.split('\t')
cur_date = datetime.strptime(fields[2], '%Y-%m-%d %H:%M:%S')
if fields[0] == '2':
time_to_merge += int(fields[4])
list = ast.literal_eval(fields[-1])
count_of_parts -= len(list) - 1
else:
count_of_parts += 1
if max_count_of_parts < count_of_parts:
max_count_of_parts = count_of_parts
parts_in_time.append([(cur_date-last_date).total_seconds(), count_of_parts])
last_date = cur_date
stats_parts_in_time = []
global_time = 0
average_parts = 0
for i in range(max_count_of_parts + 1):
stats_parts_in_time.append(0)
for elem in parts_in_time:
stats_parts_in_time[elem[1]] += elem[0]
global_time += elem[0]
average_parts += elem[0] * elem[1]
for i in range(max_count_of_parts):
stats_parts_in_time[i] /= global_time
average_parts /= global_time
return time_to_merge, max_count_of_parts, average_parts, stats_parts_in_time
def main():
time_to_merge, max_parts, average_parts, stats_parts = get_metrix()
print('time_to_merge=', time_to_merge)
print('max_parts=', max_parts)
print('average_parts=', average_parts)
print('stats_parts=', stats_parts)
if __name__ == '__main__':
main()

View File

@ -33,6 +33,7 @@
#include <DB/Interpreters/InterserverIOHandler.h> #include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/Interpreters/Compiler.h> #include <DB/Interpreters/Compiler.h>
#include <DB/Interpreters/QueryLog.h> #include <DB/Interpreters/QueryLog.h>
#include <DB/Interpreters/PartLog.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/IO/ReadBufferFromFile.h> #include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/UncompressedCache.h> #include <DB/IO/UncompressedCache.h>
@ -95,8 +96,8 @@ struct ContextShared
mutable zkutil::ZooKeeperPtr zookeeper; /// Клиент для ZooKeeper. mutable zkutil::ZooKeeperPtr zookeeper; /// Клиент для ZooKeeper.
String interserver_io_host; /// Имя хоста по которым это сервер доступен для других серверов. String interserver_io_host; /// Имя хоста по которым это сервер доступен для других серверов.
int interserver_io_port; /// и порт, int interserver_io_port; /// и порт,
String path; /// Путь к директории с данными, со слешем на конце. String path; /// Путь к директории с данными, со слешем на конце.
String tmp_path; /// Путь ко временным файлам, возникающим при обработке запроса. String tmp_path; /// Путь ко временным файлам, возникающим при обработке запроса.
@ -121,6 +122,7 @@ struct ContextShared
Macros macros; /// Substitutions extracted from config. Macros macros; /// Substitutions extracted from config.
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary. std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
std::unique_ptr<QueryLog> query_log; /// Used to log queries. std::unique_ptr<QueryLog> query_log; /// Used to log queries.
std::unique_ptr<PartLog> part_log; /// Used to log operations with parts
/// Правила для выбора метода сжатия в зависимости от размера куска. /// Правила для выбора метода сжатия в зависимости от размера куска.
mutable std::unique_ptr<CompressionMethodSelector> compression_method_selector; mutable std::unique_ptr<CompressionMethodSelector> compression_method_selector;
std::unique_ptr<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines. std::unique_ptr<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
@ -1061,6 +1063,34 @@ QueryLog & Context::getQueryLog()
} }
PartLog * Context::getPartLog()
{
auto lock = getLock();
auto & config = Poco::Util::Application::instance().config();
if (!config.has("part_log"))
return nullptr;
if (!shared->part_log)
{
if (shared->shutdown_called)
throw Exception("Will not get part_log because shutdown was called", ErrorCodes::LOGICAL_ERROR);
if (!global_context)
throw Exception("Logical error: no global context for part log", ErrorCodes::LOGICAL_ERROR);
String database = config.getString("part_log.database", "system");
String table = config.getString("part_log.table", "part_log");
size_t flush_interval_milliseconds = parse<size_t>(
config.getString("part_log.flush_interval_milliseconds", DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS_STR));
shared->part_log = std::make_unique<PartLog>(
*global_context, database, table, "MergeTree(event_date, event_time, 1024)", flush_interval_milliseconds);
}
return shared->part_log.get();
}
CompressionMethod Context::chooseCompressionMethod(size_t part_size, double part_size_ratio) const CompressionMethod Context::chooseCompressionMethod(size_t part_size, double part_size_ratio) const
{ {
auto lock = getLock(); auto lock = getLock();

View File

@ -0,0 +1,63 @@
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/Interpreters/PartLog.h>
#include <common/ClickHouseRevision.h>
#include <Poco/Net/IPAddress.h>
#include <array>
namespace DB
{
Block PartLogElement::createBlock()
{
return
{
{std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "event_type"},
{std::make_shared<ColumnUInt16>(), std::make_shared<DataTypeDate>(), "event_date"},
{std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "size_in_bytes"},
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "duration_ms"},
{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "database"},
{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "table"},
{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "part_name"},
{std::make_shared<ColumnArray>(std::make_shared<ColumnString>()),
std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "merged_from"},
};
}
void PartLogElement::appendToBlock(Block & block) const
{
size_t i = 0;
block.getByPosition(i++).column->insert(UInt64(event_type));
block.getByPosition(i++).column->insert(UInt64(DateLUT::instance().toDayNum(event_time)));
block.getByPosition(i++).column->insert(UInt64(event_time));
block.getByPosition(i++).column->insert(UInt64(size_in_bytes));
block.getByPosition(i++).column->insert(UInt64(duration_ms));
block.getByPosition(i++).column->insert(database_name);
block.getByPosition(i++).column->insert(table_name);
block.getByPosition(i++).column->insert(part_name);
Array merged_from_array;
merged_from_array.reserve(merged_from.size());
for (const auto & name : merged_from)
merged_from_array.push_back(name);
block.getByPosition(i++).column->insert(merged_from_array);
}
}

View File

@ -135,6 +135,16 @@
</query_log> </query_log>
<!-- Uncomment if use part_log
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</part_log>
-->
<!-- Parameters for embedded dictionaries, used in Yandex.Metrica. <!-- Parameters for embedded dictionaries, used in Yandex.Metrica.
See https://clickhouse.yandex/reference_en.html#Internal%20dictionaries See https://clickhouse.yandex/reference_en.html#Internal%20dictionaries
--> -->

View File

@ -25,6 +25,7 @@
#include <DB/Common/Increment.h> #include <DB/Common/Increment.h>
#include <DB/Common/escapeForFileName.h> #include <DB/Common/escapeForFileName.h>
#include <DB/Common/StringUtils.h> #include <DB/Common/StringUtils.h>
#include <DB/Common/Stopwatch.h>
#include <DB/IO/Operators.h> #include <DB/IO/Operators.h>
#include <algorithm> #include <algorithm>
@ -57,6 +58,7 @@ namespace ErrorCodes
MergeTreeData::MergeTreeData( MergeTreeData::MergeTreeData(
const String & database_, const String & table_,
const String & full_path_, NamesAndTypesListPtr columns_, const String & full_path_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
@ -71,12 +73,13 @@ MergeTreeData::MergeTreeData(
bool require_part_metadata_, bool require_part_metadata_,
bool attach, bool attach,
BrokenPartCallback broken_part_callback_) BrokenPartCallback broken_part_callback_)
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_), : ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
date_column_name(date_column_name_), sampling_expression(sampling_expression_), date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_), index_granularity(index_granularity_),
merging_params(merging_params_), merging_params(merging_params_),
settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr), settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr),
require_part_metadata(require_part_metadata_), require_part_metadata(require_part_metadata_),
database_name(database_), table_name(table_),
full_path(full_path_), columns(columns_), full_path(full_path_), columns(columns_),
broken_part_callback(broken_part_callback_), broken_part_callback(broken_part_callback_),
log_name(log_name_), log(&Logger::get(log_name + " (Data)")) log_name(log_name_), log(&Logger::get(log_name + " (Data)"))

View File

@ -3,6 +3,9 @@
#include <DB/Common/escapeForFileName.h> #include <DB/Common/escapeForFileName.h>
#include <DB/DataTypes/DataTypeArray.h> #include <DB/DataTypes/DataTypeArray.h>
#include <DB/IO/HashingWriteBuffer.h> #include <DB/IO/HashingWriteBuffer.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Interpreters/PartLog.h>
#include <DB/Interpreters/Context.h>
#include <Poco/File.h> #include <Poco/File.h>
@ -85,6 +88,11 @@ BlocksWithDateIntervals MergeTreeDataWriter::splitBlockIntoParts(const Block & b
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDateInterval & block_with_dates, Int64 temp_index) MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDateInterval & block_with_dates, Int64 temp_index)
{ {
/// For logging
Stopwatch stopwatch;
PartLogElement elem;
elem.event_time = time(0);
Block & block = block_with_dates.block; Block & block = block_with_dates.block;
UInt16 min_date = block_with_dates.min_date; UInt16 min_date = block_with_dates.min_date;
UInt16 max_date = block_with_dates.max_date; UInt16 max_date = block_with_dates.max_date;
@ -157,6 +165,20 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->size_in_bytes); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->size_in_bytes);
PartLog * part_log = context.getPartLog();
if (part_log)
{
elem.event_type = PartLogElement::NEW_PART;
elem.size_in_bytes = new_data_part->size_in_bytes;
elem.duration_ms = stopwatch.elapsed() / 1000000;
elem.database_name = new_data_part->storage.getDatabaseName();
elem.table_name = new_data_part->storage.getTableName();
elem.part_name = new_data_part->name;
part_log->add(elem);
}
return new_data_part; return new_data_part;
} }

View File

@ -9,6 +9,7 @@
#include <DB/Common/escapeForFileName.h> #include <DB/Common/escapeForFileName.h>
#include <DB/Interpreters/InterpreterAlterQuery.h> #include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Interpreters/ExpressionAnalyzer.h> #include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Interpreters/PartLog.h>
#include <DB/Parsers/ASTFunction.h> #include <DB/Parsers/ASTFunction.h>
#include <DB/Parsers/ASTSelectQuery.h> #include <DB/Parsers/ASTSelectQuery.h>
@ -43,15 +44,16 @@ StorageMergeTree::StorageMergeTree(
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag, bool has_force_restore_data_flag,
const MergeTreeSettings & settings_) const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, : IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'), path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
context(context_), background_pool(context_.getBackgroundPool()), context(context_), background_pool(context_.getBackgroundPool()),
data(full_path, columns_, data(database_name, table_name,
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_, context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, merging_params_, sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, false, attach), settings_, database_name_ + "." + table_name, false, attach),
reader(data), writer(data), merger(data, context.getBackgroundPool()), reader(data), writer(data, context), merger(data, context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)")) log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
{ {
data.loadDataParts(has_force_restore_data_flag); data.loadDataParts(has_force_restore_data_flag);
@ -333,11 +335,35 @@ bool StorageMergeTree::merge(
MergeList::EntryPtr merge_entry_ptr = context.getMergeList().insert(database_name, table_name, merged_name, merging_tagger->parts); MergeList::EntryPtr merge_entry_ptr = context.getMergeList().insert(database_name, table_name, merged_name, merging_tagger->parts);
/// Logging
PartLogElement elem;
Stopwatch stopwatch;
elem.event_time = time(0);
elem.merged_from.reserve(merging_tagger->parts.size());
for (const auto & part : merging_tagger->parts)
elem.merged_from.push_back(part->name);
auto new_part = merger.mergePartsToTemporaryPart( auto new_part = merger.mergePartsToTemporaryPart(
merging_tagger->parts, merged_name, *merge_entry_ptr, aio_threshold, time(0), merging_tagger->reserved_space.get()); merging_tagger->parts, merged_name, *merge_entry_ptr, aio_threshold, time(0), merging_tagger->reserved_space.get());
merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr); merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr);
PartLog * part_log = context.getPartLog();
if (part_log)
{
elem.event_type = PartLogElement::MERGE_PARTS;
elem.size_in_bytes = new_part->size_in_bytes;
elem.database_name = new_part->storage.getDatabaseName();
elem.table_name = new_part->storage.getTableName();
elem.part_name = new_part->name;
elem.duration_ms = stopwatch.elapsed() / 1000000;
part_log->add(elem);
}
return true; return true;
} }

View File

@ -23,6 +23,7 @@
#include <DB/IO/Operators.h> #include <DB/IO/Operators.h>
#include <DB/Interpreters/InterpreterAlterQuery.h> #include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Interpreters/PartLog.h>
#include <DB/DataStreams/AddingConstColumnBlockInputStream.h> #include <DB/DataStreams/AddingConstColumnBlockInputStream.h>
#include <DB/DataStreams/RemoteBlockInputStream.h> #include <DB/DataStreams/RemoteBlockInputStream.h>
@ -207,18 +208,19 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag, bool has_force_restore_data_flag,
const MergeTreeSettings & settings_) const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_), : IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
current_zookeeper(context.getZooKeeper()), database_name(database_name_), current_zookeeper(context.getZooKeeper()), database_name(database_name_),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
zookeeper_path(context.getMacros().expand(zookeeper_path_)), zookeeper_path(context.getMacros().expand(zookeeper_path_)),
replica_name(context.getMacros().expand(replica_name_)), replica_name(context.getMacros().expand(replica_name_)),
data(full_path, columns_, data(database_name, table_name,
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_, context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, merging_params_, sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, true, attach, settings_, database_name_ + "." + table_name, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); }), [this] (const std::string & name) { enqueuePartForCheck(name); }),
reader(data), writer(data), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this), reader(data), writer(data, context), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this),
shutdown_event(false), part_check_thread(*this), shutdown_event(false), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)")) log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{ {
@ -291,7 +293,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
String unreplicated_path = full_path + "unreplicated/"; String unreplicated_path = full_path + "unreplicated/";
if (Poco::File(unreplicated_path).exists()) if (Poco::File(unreplicated_path).exists())
{ {
unreplicated_data = std::make_unique<MergeTreeData>(unreplicated_path, columns_, unreplicated_data = std::make_unique<MergeTreeData>(
database_name, table_name,
unreplicated_path, columns_,
materialized_columns_, alias_columns_, column_defaults_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, context_, primary_expr_ast_,
date_column_name_, sampling_expression_, index_granularity_, merging_params_, settings_, date_column_name_, sampling_expression_, index_granularity_, merging_params_, settings_,
@ -507,7 +511,7 @@ namespace
in >> read_primary_key; in >> read_primary_key;
/// NOTE: Можно сделать менее строгую проверку совпадения выражений, чтобы таблицы не ломались от небольших изменений /// NOTE: Можно сделать менее строгую проверку совпадения выражений, чтобы таблицы не ломались от небольших изменений
/// в коде formatAST. /// в коде formatAST.
if (read_primary_key != local_primary_key) if (read_primary_key != local_primary_key)
throw Exception("Existing table metadata in ZooKeeper differs in primary key." throw Exception("Existing table metadata in ZooKeeper differs in primary key."
" Stored in ZooKeeper: " + read_primary_key + ", local: " + local_primary_key, " Stored in ZooKeeper: " + read_primary_key + ", local: " + local_primary_key,
@ -710,9 +714,9 @@ void StorageReplicatedMergeTree::createReplica()
/** Если эталонная реплика еще не до конца создана, подождем. /** Если эталонная реплика еще не до конца создана, подождем.
* NOTE: Если при ее создании что-то пошло не так, можем провисеть тут вечно. * NOTE: Если при ее создании что-то пошло не так, можем провисеть тут вечно.
* Можно создавать на время создания эфемерную ноду, чтобы быть уверенным, что реплика создается, а не заброшена. * Можно создавать на время создания эфемерную ноду, чтобы быть уверенным, что реплика создается, а не заброшена.
* То же можно делать и для таблицы. Можно автоматически удалять ноду реплики/таблицы, * То же можно делать и для таблицы. Можно автоматически удалять ноду реплики/таблицы,
* если видно, что она создана не до конца, а создающий ее умер. * если видно, что она создана не до конца, а создающий ее умер.
*/ */
while (!zookeeper->exists(source_path + "/columns")) while (!zookeeper->exists(source_path + "/columns"))
{ {
@ -1667,7 +1671,7 @@ bool StorageReplicatedMergeTree::canMergeParts(
} }
else else
{ {
String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number); String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number); String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number);
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED && if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
@ -1802,7 +1806,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
/// Уберем больше не нужные отметки о несуществующих блоках. /// Уберем больше не нужные отметки о несуществующих блоках.
for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, parts[i]->right + 1); number <= parts[i + 1]->left - 1; ++number) for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, parts[i]->right + 1); number <= parts[i + 1]->left - 1; ++number)
{ {
zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number)); zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number));
zookeeper->tryRemove(zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number)); zookeeper->tryRemove(zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number));
} }
} }
@ -2035,9 +2039,27 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host")); ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
Stopwatch stopwatch;
PartLogElement elem;
elem.event_time = time(0);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart( MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(
part_name, replica_path, address.host, address.replication_port, to_detached); part_name, replica_path, address.host, address.replication_port, to_detached);
PartLog * part_log = context.getPartLog();
if (part_log)
{
elem.event_type = PartLogElement::DOWNLOAD_PART;
elem.size_in_bytes = part->size_in_bytes;
elem.duration_ms = stopwatch.elapsed() / 10000000;
elem.database_name = part->storage.getDatabaseName();
elem.table_name = part->storage.getTableName();
elem.part_name = part->name;
part_log->add(elem);
}
if (!to_detached) if (!to_detached)
{ {
zkutil::Ops ops; zkutil::Ops ops;
@ -2642,7 +2664,7 @@ void StorageReplicatedMergeTree::dropPartition(
* Это запретит мерджи удаляемых кусков с новыми вставляемыми данными. * Это запретит мерджи удаляемых кусков с новыми вставляемыми данными.
* Инвариант: в логе не появятся слияния удаляемых кусков с другими кусками. * Инвариант: в логе не появятся слияния удаляемых кусков с другими кусками.
* NOTE: Если понадобится аналогично поддержать запрос DROP PART, для него придется придумать какой-нибудь новый механизм, * NOTE: Если понадобится аналогично поддержать запрос DROP PART, для него придется придумать какой-нибудь новый механизм,
* чтобы гарантировать этот инвариант. * чтобы гарантировать этот инвариант.
*/ */
Int64 right; Int64 right;