Merge pull request #233 from yandex/metrics_refinement

Perfomance metrics refinement
This commit is contained in:
alexey-milovidov 2016-12-12 20:56:30 +04:00 committed by GitHub
commit eb3e2c1695
23 changed files with 486 additions and 1251 deletions

View File

@ -46,32 +46,34 @@ set (COMMON_WARNING_FLAGS "-Wall -Werror")
set (CXX_WARNING_FLAGS "-Wnon-virtual-dtor")
set (CXX11_ABI "ENABLE" CACHE STRING "Use C++11 ABI: DEFAULT, ENABLE, DISABLE")
set (TEST_COVERAGE FALSE CACHE BOOL "Enables flags for test coverage")
set (ENABLE_TESTS TRUE CACHE BOOL "Enables tests")
option (TEST_COVERAGE "Enables flags for test coverage" OFF)
option (ENABLE_TESTS "Enables tests" ON)
set (USE_STATIC_LIBRARIES TRUE CACHE BOOL "Set to FALSE to use shared libraries")
option (USE_STATIC_LIBRARIES "Set to FALSE to use shared libraries" ON)
if (NOT $ENV{USE_STATIC_LIBRARIES})
set (USE_STATIC_LIBRARIES FALSE)
endif ()
set (USE_INTERNAL_BOOST_LIBRARY TRUE CACHE BOOL "Set to FALSE to use system boost library instead of bundled")
if (NOT $ENV{USE_INTERNAL_BOOST_LIBRARY})
set (USE_INTERNAL_BOOST_LIBRARY FALSE)
endif ()
set (GLIBC_COMPATIBILITY FALSE CACHE BOOL "Set to TRUE to enable compatibility with older glibc libraries. Note that it is not compatible with ASan.")
option (GLIBC_COMPATIBILITY "Set to TRUE to enable compatibility with older glibc libraries. Note that it is not compatible with ASan." OFF)
if ($ENV{GLIBC_COMPATIBILITY})
set (GLIBC_COMPATIBILITY TRUE)
endif ()
set (ENABLE_LIBTCMALLOC TRUE CACHE BOOL "Set to TRUE to enable libtcmalloc.")
option (ENABLE_LIBTCMALLOC "Set to TRUE to enable libtcmalloc." ON)
if (NOT $ENV{ENABLE_LIBTCMALLOC})
set (ENABLE_LIBTCMALLOC FALSE)
endif ()
set (DEBUG_LIBTCMALLOC FALSE CACHE BOOL "Set to TRUE to use debug version of libtcmalloc.")
option (DEBUG_LIBTCMALLOC "Set to TRUE to use debug version of libtcmalloc." OFF)
if ($ENV{DEBUG_LIBTCMALLOC})
set (ENABLE_LIBTCMALLOC TRUE)
set (DEBUG_LIBTCMALLOC TRUE)
endif ()
if (GLIBC_COMPATIBILITY)

View File

@ -0,0 +1,13 @@
macro(add_glob cur_list)
file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${ARGN})
list(APPEND ${cur_list} ${__tmp})
endmacro()
macro(add_headers_and_sources prefix common_path)
add_glob(${prefix}_headers RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/DB/${common_path}/*.h include/DB/${common_path}/*.inl)
add_glob(${prefix}_sources src/${common_path}/*.cpp src/${common_path}/*.h)
endmacro()
macro(add_headers_only prefix common_path)
add_glob(${prefix}_headers RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/DB/${common_path}/*.h src/${common_path}/*.h)
endmacro()

View File

@ -43,7 +43,7 @@ add_library (re2_st ${re2_headers})
set_target_properties (re2_st PROPERTIES COMPILE_DEFINITIONS "NO_THREADS;re2=re2_st")
message ("Creating headers for re2_st library.")
message (STATUS "Creating headers for re2_st library.")
file (MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/re2_st)
foreach (FILENAME filtered_re2.h re2.h set.h stringpiece.h variadic_function.h)
file (READ ${CMAKE_CURRENT_SOURCE_DIR}/re2/${FILENAME} CONTENT)

View File

@ -47,7 +47,7 @@ FILE(READ ${LIBRARY_DIR}/zstd.h HEADER_CONTENT)
# Parse version
GetLibraryVersion("${HEADER_CONTENT}" LIBVER_MAJOR LIBVER_MINOR LIBVER_RELEASE)
MESSAGE("ZSTD VERSION ${LIBVER_MAJOR}.${LIBVER_MINOR}.${LIBVER_RELEASE}")
MESSAGE(STATUS "ZSTD VERSION ${LIBVER_MAJOR}.${LIBVER_MINOR}.${LIBVER_RELEASE}")
SET(Sources
${LIBRARY_DIR}/common/entropy_common.c

File diff suppressed because it is too large Load Diff

View File

@ -24,15 +24,15 @@ struct BlockIO
Block in_sample; /// Пример блока, который будет прочитан из in.
Block out_sample; /// Пример блока, которого нужно писать в out.
/// Здесь могут быть установлены колбэки для логгирования запроса.
std::function<void(IBlockInputStream *)> finish_callback;
/// Callbacks for query logging could be set here.
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
std::function<void()> exception_callback;
/// Вызывайте эти функции, если нужно логгировать запрос.
void onFinish()
{
if (finish_callback)
finish_callback(in.get());
finish_callback(in.get(), out.get());
}
void onException()

View File

@ -0,0 +1,62 @@
#pragma once
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Interpreters/ProcessList.h>
namespace DB
{
/// Proxy class which counts number of written block, rows, bytes
class CountingBlockOutputStream : public IBlockOutputStream
{
public:
CountingBlockOutputStream(const BlockOutputStreamPtr & stream_)
: stream(stream_) {}
void setProgressCallback(ProgressCallback callback)
{
progress_callback = callback;
}
void setProcessListElement(ProcessListElement * elem)
{
process_elem = elem;
}
const Progress & getProgress() const
{
return progress;
}
void write(const Block & block) override
{
stream->write(block);
Progress local_progress(block.rowsInFirstColumn(), block.bytes(), 0);
progress.incrementPiecewiseAtomically(local_progress);
if (process_elem)
process_elem->updateProgressOut(local_progress);
if (progress_callback)
progress_callback(local_progress);
}
void writePrefix() override { stream->writePrefix(); }
void writeSuffix() override { stream->writeSuffix(); }
void flush() override { stream->flush(); }
void onProgress(const Progress & progress) override { stream->onProgress(progress); }
String getContentType() const override { return stream->getContentType(); }
protected:
BlockOutputStreamPtr stream;
Progress progress;
ProgressCallback progress_callback;
ProcessListElement * process_elem = nullptr;
};
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Interpreters/Context.h>
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/BlockIO.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/** Prepares an input stream which produce data containing in INSERT query
* Head of inserting data could be stored in INSERT ast directly
* Remaining (tail) data could be stored in input_buffer_tail_part
*/
class InputStreamFromASTInsertQuery : public IProfilingBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context);
Block readImpl() override { return res_stream->read(); }
void readPrefixImpl() override { return res_stream->readPrefix(); }
void readSuffixImpl() override { return res_stream->readSuffix(); }
String getName() const override { return "InputStreamFromASTInsertQuery"; }
String getID() const override { return "InputStreamFromASTInsertQuery(" + toString(this) + ")"; }
private:
std::unique_ptr<ReadBuffer> input_buffer_ast_part;
std::unique_ptr<ReadBuffer> input_buffer_contacenated;
BlockInputStreamPtr res_stream;
};
}

View File

@ -37,9 +37,11 @@ struct ProcessInfo
{
String query;
double elapsed_seconds;
size_t rows;
size_t bytes;
size_t read_rows;
size_t read_bytes;
size_t total_rows;
size_t written_rows;
size_t written_bytes;
Int64 memory_usage;
ClientInfo client_info;
};
@ -53,7 +55,10 @@ struct ProcessListElement
Stopwatch watch;
Progress progress;
/// Progress of input stream
Progress progress_in;
/// Progress of output stream
Progress progress_out;
MemoryTracker memory_tracker;
@ -88,9 +93,9 @@ struct ProcessListElement
current_memory_tracker = nullptr;
}
bool update(const Progress & value)
bool updateProgressIn(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
progress_in.incrementPiecewiseAtomically(value);
if (priority_handle)
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable.
@ -98,6 +103,13 @@ struct ProcessListElement
return !is_cancelled;
}
bool updateProgressOut(const Progress & value)
{
progress_out.incrementPiecewiseAtomically(value);
return !is_cancelled;
}
ProcessInfo getInfo() const
{
ProcessInfo res;
@ -105,9 +117,11 @@ struct ProcessListElement
res.query = query;
res.client_info = client_info;
res.elapsed_seconds = watch.elapsedSeconds();
res.rows = progress.rows;
res.bytes = progress.bytes;
res.total_rows = progress.total_rows;
res.read_rows = progress_in.rows;
res.read_bytes = progress_in.bytes;
res.total_rows = progress_in.total_rows;
res.written_rows = progress_out.rows;
res.written_bytes = progress_out.bytes;
res.memory_usage = memory_tracker.get();
return res;

View File

@ -7,14 +7,13 @@ namespace DB
{
/** Позволяет логгировать информацию о выполнении запросов:
* - о начале выполнения запроса;
* - метрики производительности, после выполнения запроса;
* - об ошибках при выполнении запроса.
/** Allows to log information about queries execution:
* - info about start of query execution;
* - performance metrics (are set at the end of query execution);
* - info about errors of query execution.
*/
/** Что логгировать.
*/
/// A struct which will be inserted as row into query_log table
struct QueryLogElement
{
enum Type
@ -27,15 +26,21 @@ struct QueryLogElement
Type type = QUERY_START;
/// В зависимости от типа, не все поля могут быть заполнены.
/// Depending on the type of query and type of stage, not all the fields may be filled.
time_t event_time{};
time_t query_start_time{};
UInt64 query_duration_ms{};
/// The data fetched from DB to execute the query
UInt64 read_rows{};
UInt64 read_bytes{};
/// The data written to DB
UInt64 written_rows{};
UInt64 written_bytes{};
/// The data sent to the client
UInt64 result_rows{};
UInt64 result_bytes{};

View File

@ -34,20 +34,13 @@ struct MergeInfo
std::atomic<UInt64> bytes_read_uncompressed{};
std::atomic<UInt64> bytes_written_uncompressed{};
/// Updated only for Horizontal algorithm
/// In case of Vertical algorithm they are actual only for primary key columns
std::atomic<UInt64> rows_read{};
std::atomic<UInt64> rows_written{};
/// Updated only for Vertical algorithm
/// mutually exclusive with rows_read and rows_written, updated either rows_written either columns_written
std::atomic<UInt64> columns_written{};
/// Updated in both cases
/// Number of rows for which primary key columns have been written
std::atomic<UInt64> rows_with_key_columns_read{};
std::atomic<UInt64> rows_with_key_columns_written{};
MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name)
: database{database}, table{table}, result_part_name{result_part_name}
{
@ -66,9 +59,7 @@ struct MergeInfo
bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)),
rows_read(other.rows_read.load(std::memory_order_relaxed)),
rows_written(other.rows_written.load(std::memory_order_relaxed)),
columns_written(other.columns_written.load(std::memory_order_relaxed)),
rows_with_key_columns_read(other.rows_with_key_columns_read.load(std::memory_order_relaxed)),
rows_with_key_columns_written(other.rows_with_key_columns_written.load(std::memory_order_relaxed))
columns_written(other.columns_written.load(std::memory_order_relaxed))
{
}
};

View File

@ -98,6 +98,9 @@ struct MergeTreeSettings
/// Enable usage of Vertical merge algorithm.
size_t enable_vertical_merge_algorithm = 0;
/// Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm
size_t vertical_merge_algorithm_min_rows_to_activate = 16 * DEFAULT_MERGE_BLOCK_SIZE;
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
@ -133,6 +136,7 @@ struct MergeTreeSettings
SET_SIZE_T(min_relative_delay_to_close);
SET_SIZE_T(min_absolute_delay_to_close);
SET_SIZE_T(enable_vertical_merge_algorithm);
SET_SIZE_T(vertical_merge_algorithm_min_rows_to_activate);
#undef SET_SIZE_T
#undef SET_DOUBLE

View File

@ -86,6 +86,7 @@
\
M(MergedRows) \
M(MergedUncompressedBytes) \
M(MergesTime)\
\
M(MergeTreeDataWriterRows) \
M(MergeTreeDataWriterUncompressedBytes) \

View File

@ -6,7 +6,6 @@
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
@ -228,15 +227,15 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
if (process_list_elem)
{
if (!process_list_elem->update(value))
if (!process_list_elem->updateProgressIn(value))
cancel();
/// Общее количество данных, обработанных или предполагаемых к обработке во всех листовых источниках, возможно, на удалённых серверах.
size_t rows_processed = process_list_elem->progress.rows;
size_t bytes_processed = process_list_elem->progress.bytes;
size_t rows_processed = process_list_elem->progress_in.rows;
size_t bytes_processed = process_list_elem->progress_in.bytes;
size_t total_rows_estimate = std::max(rows_processed, process_list_elem->progress.total_rows.load(std::memory_order_relaxed));
size_t total_rows_estimate = std::max(rows_processed, process_list_elem->progress_in.total_rows.load(std::memory_order_relaxed));
/** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения.
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
@ -270,7 +269,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
size_t total_rows = process_list_elem->progress.total_rows;
size_t total_rows = process_list_elem->progress_in.total_rows;
if (limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0))
{
@ -283,7 +282,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
size_t total_rows = process_list_elem->progress.total_rows;
size_t total_rows = process_list_elem->progress_in.total_rows;
/// Если предсказанное время выполнения больше, чем max_execution_time.
if (limits.max_execution_time != 0 && total_rows)

View File

@ -0,0 +1,37 @@
#include <DB/DataStreams/InputStreamFromASTInsertQuery.h>
namespace DB
{
InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context)
{
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
if (!ast_insert_query)
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
String format = ast_insert_query->format;
if (format.empty())
format = "Values";
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
input_buffer_ast_part = std::make_unique<ReadBuffer>(
const_cast<char *>(ast_insert_query->data), ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0, 0);
ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
buffers.push_back(&input_buffer_tail_part);
/** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
*/
input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out_sample, context.getSettings().max_insert_block_size);
}
}

View File

@ -6,6 +6,7 @@
#include <DB/DataStreams/PushingToViewsBlockOutputStream.h>
#include <DB/DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DB/DataStreams/SquashingBlockOutputStream.h>
#include <DB/DataStreams/CountingBlockOutputStream.h>
#include <DB/DataStreams/NullableAdapterBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
@ -102,6 +103,10 @@ BlockIO InterpreterInsertQuery::execute()
context.getSettingsRef().min_insert_block_size_rows,
context.getSettingsRef().min_insert_block_size_bytes);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());
out = std::move(out_wrapper);
BlockIO res;
res.out_sample = getSampleBlock();

View File

@ -28,6 +28,9 @@ Block QueryLogElement::createBlock()
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "read_rows"},
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "read_bytes"},
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "written_rows"},
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "written_bytes"},
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "result_rows"},
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "result_bytes"},
@ -102,6 +105,9 @@ void QueryLogElement::appendToBlock(Block & block) const
block.unsafeGetByPosition(i++).column->insert(UInt64(read_rows));
block.unsafeGetByPosition(i++).column->insert(UInt64(read_bytes));
block.unsafeGetByPosition(i++).column->insert(UInt64(written_rows));
block.unsafeGetByPosition(i++).column->insert(UInt64(written_bytes));
block.unsafeGetByPosition(i++).column->insert(UInt64(result_rows));
block.unsafeGetByPosition(i++).column->insert(UInt64(result_bytes));

View File

@ -6,6 +6,8 @@
#include <DB/DataStreams/BlockIO.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/InputStreamFromASTInsertQuery.h>
#include <DB/DataStreams/CountingBlockOutputStream.h>
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTShowProcesslistQuery.h>
@ -191,13 +193,21 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (res.in)
{
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(res.in.get()))
if (auto stream = dynamic_cast<IProfilingBlockInputStream *>(res.in.get()))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
}
}
if (res.out)
{
if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get()))
{
stream->setProcessListElement(context.getProcessListElement());
}
}
/// Everything related to query log.
{
QueryLogElement elem;
@ -218,7 +228,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.getQueryLog().add(elem);
/// Also make possible for caller to log successful query finish and exception during execution.
res.finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream) mutable
res.finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable
{
ProcessListElement * process_list_elem = context.getProcessListElement();
@ -232,22 +242,35 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.event_time = time(0);
elem.query_duration_ms = elapsed_seconds * 1000;
elem.read_rows = process_list_elem->progress.rows;
elem.read_bytes = process_list_elem->progress.bytes;
elem.read_rows = process_list_elem->progress_in.rows;
elem.read_bytes = process_list_elem->progress_in.bytes;
elem.written_rows = process_list_elem->progress_out.rows;
elem.written_bytes = process_list_elem->progress_out.bytes;
auto memory_usage = process_list_elem->memory_tracker.getPeak();
elem.memory_usage = memory_usage > 0 ? memory_usage : 0;
if (stream)
if (stream_in)
{
if (IProfilingBlockInputStream * profiling_stream = dynamic_cast<IProfilingBlockInputStream *>(stream))
if (auto profiling_stream = dynamic_cast<const IProfilingBlockInputStream *>(stream_in))
{
const BlockStreamProfileInfo & info = profiling_stream->getProfileInfo();
/// NOTE: INSERT SELECT query contains zero metrics
elem.result_rows = info.rows;
elem.result_bytes = info.bytes;
}
}
else if (stream_out) /// will be used only for ordinary INSERT queries
{
if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out))
{
/// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.
elem.result_rows = counting_stream->getProgress().rows;
elem.result_bytes = counting_stream->getProgress().bytes;
}
}
if (elem.read_rows != 0)
{
@ -280,8 +303,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.query_duration_ms = elapsed_seconds * 1000;
elem.read_rows = process_list_elem->progress.rows;
elem.read_bytes = process_list_elem->progress.bytes;
elem.read_rows = process_list_elem->progress_in.rows;
elem.read_bytes = process_list_elem->progress_in.bytes;
auto memory_usage = process_list_elem->memory_tracker.getPeak();
elem.memory_usage = memory_usage > 0 ? memory_usage : 0;
@ -369,35 +392,8 @@ void executeQuery(
{
if (streams.out)
{
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
if (!ast_insert_query)
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
String format = ast_insert_query->format;
if (format.empty())
format = "Values";
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (istr) part of query.
ConcatReadBuffer::ReadBuffers buffers;
ReadBuffer buf1(const_cast<char *>(ast_insert_query->data), ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0, 0);
if (ast_insert_query->data)
buffers.push_back(&buf1);
buffers.push_back(&istr);
/** NOTE Must not read from 'istr' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'istr'.
*/
ConcatReadBuffer data_istr(buffers);
BlockInputStreamPtr in{
context.getInputFormat(
format, data_istr, streams.out_sample, context.getSettings().max_insert_block_size)};
copyData(*in, *streams.out);
InputStreamFromASTInsertQuery in(ast, istr, streams, context);
copyData(in, *streams.out);
}
if (streams.in)
@ -410,7 +406,7 @@ void executeQuery(
BlockOutputStreamPtr out = context.getOutputFormat(format_name, ostr, streams.in_sample);
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(streams.in.get()))
if (auto stream = dynamic_cast<IProfilingBlockInputStream *>(streams.in.get()))
{
/// NOTE Progress callback takes shared ownership of 'out'.
stream->setProgressCallback([out] (const Progress & progress) { out->onProgress(progress); });

View File

@ -31,6 +31,7 @@ namespace ProfileEvents
{
extern const Event MergedRows;
extern const Event MergedUncompressedBytes;
extern const Event MergesTime;
}
namespace CurrentMetrics
@ -384,64 +385,76 @@ public:
}
};
/** Progress callback. Is used by Horizontal merger and first step of Vertical merger.
* What it should update:
* - approximate progress
* - amount of merged rows and their size (PK columns subset is used in case of Vertical merge)
* - time elapsed for current merge.
*/
class MergeProgressCallback : public ProgressCallback
{
public:
MergeProgressCallback(MergeList::Entry & merge_entry_) : merge_entry(merge_entry_) {}
MergeProgressCallback(MergeList::Entry & merge_entry_, UInt64 & watch_prev_elapsed_)
: merge_entry(merge_entry_), watch_prev_elapsed(watch_prev_elapsed_) {}
MergeProgressCallback(MergeList::Entry & merge_entry_, MergeTreeDataMerger::MergeAlgorithm merge_alg_, size_t num_total_rows,
const ColumnSizeEstimator & column_sizes)
: merge_entry(merge_entry_), merge_alg(merge_alg_)
MergeProgressCallback(MergeList::Entry & merge_entry_, size_t num_total_rows, const ColumnSizeEstimator & column_sizes,
UInt64 & watch_prev_elapsed_, MergeTreeDataMerger::MergeAlgorithm merge_alg_ = MergeAlgorithm::Vertical)
: merge_entry(merge_entry_), watch_prev_elapsed(watch_prev_elapsed_), merge_alg(merge_alg_)
{
if (merge_alg == MergeAlgorithm::Horizontal)
average_elem_progress = 1.0 / num_total_rows;
else
average_elem_progress = column_sizes.keyColumnsProgress(1, num_total_rows);
average_elem_progress = (merge_alg == MergeAlgorithm::Horizontal)
? 1.0 / num_total_rows
: column_sizes.keyColumnsProgress(1, num_total_rows);
updateWatch();
}
MergeList::Entry & merge_entry;
const MergeAlgorithm merge_alg{MergeAlgorithm::Vertical};
UInt64 & watch_prev_elapsed;
Float64 average_elem_progress;
const MergeAlgorithm merge_alg{MergeAlgorithm::Vertical};
void updateWatch()
{
UInt64 watch_curr_elapsed = merge_entry->watch.elapsed();
ProfileEvents::increment(ProfileEvents::MergesTime, watch_curr_elapsed - watch_prev_elapsed);
watch_prev_elapsed = watch_curr_elapsed;
}
void operator() (const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
merge_entry->bytes_read_uncompressed += value.bytes;
merge_entry->rows_with_key_columns_read += value.rows;
if (merge_alg == MergeAlgorithm::Horizontal)
{
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
updateWatch();
merge_entry->bytes_read_uncompressed += value.bytes;
merge_entry->rows_read += value.rows;
merge_entry->progress = average_elem_progress * merge_entry->rows_read;
}
else
{
merge_entry->progress = average_elem_progress * merge_entry->rows_with_key_columns_read;
}
};
};
/** Progress callback for gathering step of Vertical merge.
* Updates: approximate progress, amount of merged bytes (TODO: two column case should be fixed), elapsed time.
*/
class MergeProgressCallbackVerticalStep : public MergeProgressCallback
{
public:
MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact,
const ColumnSizeEstimator & column_sizes, const String & column_name)
: MergeProgressCallback(merge_entry_), initial_progress(merge_entry->progress)
const ColumnSizeEstimator & column_sizes, const String & column_name, UInt64 & watch_prev_elapsed_)
: MergeProgressCallback(merge_entry_, watch_prev_elapsed_), initial_progress(merge_entry->progress)
{
average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact);
updateWatch();
}
Float64 initial_progress;
/// NOTE: not thread safe (to be copyable). It is OK in current single thread use case
size_t rows_read_internal{0};
size_t rows_read_internal{0}; // NOTE: not thread safe (to be copyable). It is OK in current single thread use case
void operator() (const Progress & value)
{
merge_entry->bytes_read_uncompressed += value.bytes;
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
updateWatch();
rows_read_internal += value.rows;
Float64 local_progress = average_elem_progress * rows_read_internal;
@ -449,6 +462,7 @@ public:
};
};
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart(
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,
@ -514,6 +528,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
* Попутно вычисляем выражение для сортировки.
*/
BlockInputStreams src_streams;
UInt64 watch_prev_elapsed = 0;
for (size_t i = 0; i < parts.size(); ++i)
{
@ -523,7 +538,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
part_path, DEFAULT_MERGE_BLOCK_SIZE, merging_column_names, data, parts[i],
MarkRanges(1, MarkRange(0, parts[i]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback(MergeProgressCallback{merge_entry, merge_alg, sum_input_rows_upper_bound, column_sizes});
input->setProgressCallback(
MergeProgressCallback{merge_entry, sum_input_rows_upper_bound, column_sizes, watch_prev_elapsed, merge_alg});
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(
@ -599,9 +615,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
rows_written += block.rows();
to.write(block);
if (merge_alg == MergeAlgorithm::Horizontal)
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
merge_entry->rows_with_key_columns_written = merged_stream->getProfileInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
/// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates
@ -621,7 +635,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
/// Gather ordinary columns
if (merge_alg == MergeAlgorithm::Vertical)
{
size_t sum_input_rows_exact = merge_entry->rows_with_key_columns_read;
size_t sum_input_rows_exact = merge_entry->rows_read;
merge_entry->columns_written = merging_column_names.size();
merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact);
@ -653,7 +667,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
false, true);
column_part_stream->setProgressCallback(
MergeProgressCallbackVerticalStep{merge_entry, sum_input_rows_exact, column_sizes, column_name});
MergeProgressCallbackVerticalStep{merge_entry, sum_input_rows_exact, column_sizes, column_name, watch_prev_elapsed});
column_part_streams[part_num] = std::move(column_part_stream);
}
@ -715,7 +729,7 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
bool enough_ordinary_cols = data.getColumnNamesList().size() > data.getSortDescription().size();
bool enough_total_rows = sum_rows_upper_bound >= DEFAULT_MERGE_BLOCK_SIZE;
bool enough_total_rows = sum_rows_upper_bound >= data.context.getMergeTreeSettings().vertical_merge_algorithm_min_rows_to_activate;
bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS;

View File

@ -25,9 +25,7 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name)
{ "rows_read", std::make_shared<DataTypeUInt64>() },
{ "bytes_written_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_written", std::make_shared<DataTypeUInt64>() },
{ "columns_written", std::make_shared<DataTypeUInt64>() },
{ "rows_with_key_columns_read", std::make_shared<DataTypeUInt64>() },
{ "rows_with_key_columns_written", std::make_shared<DataTypeUInt64>() }
{ "columns_written", std::make_shared<DataTypeUInt64>() }
}
{
}
@ -49,59 +47,26 @@ BlockInputStreams StorageSystemMerges::read(
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
ColumnWithTypeAndName col_database{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "database"};
ColumnWithTypeAndName col_table{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "table"};
ColumnWithTypeAndName col_elapsed{std::make_shared<ColumnFloat64>(), std::make_shared<DataTypeFloat64>(), "elapsed"};
ColumnWithTypeAndName col_progress{std::make_shared<ColumnFloat64>(), std::make_shared<DataTypeFloat64>(), "progress"};
ColumnWithTypeAndName col_num_parts{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "num_parts"};
ColumnWithTypeAndName col_result_part_name{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "result_part_name"};
ColumnWithTypeAndName col_total_size_bytes_compressed{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "total_size_bytes_compressed"};
ColumnWithTypeAndName col_total_size_marks{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "total_size_marks"};
ColumnWithTypeAndName col_bytes_read_uncompressed{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "bytes_read_uncompressed"};
ColumnWithTypeAndName col_rows_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_read"};
ColumnWithTypeAndName col_bytes_written_uncompressed{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "bytes_written_uncompressed"};
ColumnWithTypeAndName col_rows_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_written"};
ColumnWithTypeAndName col_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "columns_written"};
ColumnWithTypeAndName col_rows_with_key_columns_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_read"};
ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_written"};
Block block = getSampleBlock();
for (const auto & merge : context.getMergeList().get())
{
col_database.column->insert(merge.database);
col_table.column->insert(merge.table);
col_elapsed.column->insert(merge.watch.elapsedSeconds());
col_progress.column->insert(std::min(1., merge.progress)); /// little cheat
col_num_parts.column->insert(merge.num_parts);
col_result_part_name.column->insert(merge.result_part_name);
col_total_size_bytes_compressed.column->insert(merge.total_size_bytes_compressed);
col_total_size_marks.column->insert(merge.total_size_marks);
col_bytes_read_uncompressed.column->insert(merge.bytes_read_uncompressed.load(std::memory_order_relaxed));
col_rows_read.column->insert(merge.rows_read.load(std::memory_order_relaxed));
col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed));
col_rows_written.column->insert(merge.rows_written.load(std::memory_order_relaxed));
col_columns_written.column->insert(merge.columns_written.load(std::memory_order_relaxed));
col_rows_with_key_columns_read.column->insert(merge.rows_with_key_columns_read.load(std::memory_order_relaxed));
col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed));
size_t i = 0;
block.unsafeGetByPosition(i++).column->insert(merge.database);
block.unsafeGetByPosition(i++).column->insert(merge.table);
block.unsafeGetByPosition(i++).column->insert(merge.watch.elapsedSeconds());
block.unsafeGetByPosition(i++).column->insert(std::min(1., merge.progress)); /// little cheat
block.unsafeGetByPosition(i++).column->insert(merge.num_parts);
block.unsafeGetByPosition(i++).column->insert(merge.result_part_name);
block.unsafeGetByPosition(i++).column->insert(merge.total_size_bytes_compressed);
block.unsafeGetByPosition(i++).column->insert(merge.total_size_marks);
block.unsafeGetByPosition(i++).column->insert(merge.bytes_read_uncompressed.load(std::memory_order_relaxed));
block.unsafeGetByPosition(i++).column->insert(merge.rows_read.load(std::memory_order_relaxed));
block.unsafeGetByPosition(i++).column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed));
block.unsafeGetByPosition(i++).column->insert(merge.rows_written.load(std::memory_order_relaxed));
block.unsafeGetByPosition(i++).column->insert(merge.columns_written.load(std::memory_order_relaxed));
}
Block block{
col_database,
col_table,
col_elapsed,
col_progress,
col_num_parts,
col_result_part_name,
col_total_size_bytes_compressed,
col_total_size_marks,
col_bytes_read_uncompressed,
col_rows_read,
col_bytes_written_uncompressed,
col_rows_written,
col_columns_written,
col_rows_with_key_columns_read,
col_rows_with_key_columns_written
};
return BlockInputStreams{1, std::make_shared<OneBlockInputStream>(block)};
}

View File

@ -41,11 +41,13 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_)
{ "quota_key", std::make_shared<DataTypeString>() },
{ "elapsed", std::make_shared<DataTypeFloat64>() },
{ "rows_read", std::make_shared<DataTypeUInt64>() },
{ "bytes_read", std::make_shared<DataTypeUInt64>() },
{ "read_rows", std::make_shared<DataTypeUInt64>() },
{ "read_bytes", std::make_shared<DataTypeUInt64>() },
{ "total_rows_approx", std::make_shared<DataTypeUInt64>() },
{ "written_rows", std::make_shared<DataTypeUInt64>() },
{ "written_bytes", std::make_shared<DataTypeUInt64>() },
{ "memory_usage", std::make_shared<DataTypeInt64>() },
{ "query", std::make_shared<DataTypeString>() },
{ "query", std::make_shared<DataTypeString>() }
}
{
}
@ -95,9 +97,11 @@ BlockInputStreams StorageSystemProcesses::read(
block.unsafeGetByPosition(i++).column->insert(process.client_info.http_user_agent);
block.unsafeGetByPosition(i++).column->insert(process.client_info.quota_key);
block.unsafeGetByPosition(i++).column->insert(process.elapsed_seconds);
block.unsafeGetByPosition(i++).column->insert(process.rows);
block.unsafeGetByPosition(i++).column->insert(process.bytes);
block.unsafeGetByPosition(i++).column->insert(process.read_rows);
block.unsafeGetByPosition(i++).column->insert(process.read_bytes);
block.unsafeGetByPosition(i++).column->insert(process.total_rows);
block.unsafeGetByPosition(i++).column->insert(process.written_rows);
block.unsafeGetByPosition(i++).column->insert(process.written_bytes);
block.unsafeGetByPosition(i++).column->insert(process.memory_usage);
block.unsafeGetByPosition(i++).column->insert(process.query);
}

View File

@ -160,7 +160,7 @@ def main(args):
report_testcase.append(stderr_element)
failures = failures + 1
print("{0} - having stderror:\n{1}".format(MSG_FAIL, stderr))
print("{0} - having stderror:\n{1}".format(MSG_FAIL, stderr.encode('utf-8')))
elif 'Exception' in stdout:
failure = et.Element("error", attrib = {"message": "having exception"})
report_testcase.append(failure)
@ -170,7 +170,7 @@ def main(args):
report_testcase.append(stdout_element)
failures = failures + 1
print("{0} - having exception:\n{1}".format(MSG_FAIL, stdout))
print("{0} - having exception:\n{1}".format(MSG_FAIL, stdout.encode('utf-8')))
elif not os.path.isfile(reference_file):
skipped = et.Element("skipped", attrib = {"message": "no reference file"})
report_testcase.append(skipped)

View File

@ -1434,7 +1434,7 @@ function generate_selectors(elem) {
filter(function(run) { return run.data_size == current_data_size; }).
map(function(run) { return run.system; });
for (var i in systems) {
for (var i = 0; i < systems.length; i++) {
var selected = current_systems.indexOf(systems[i]) != -1;
var available = available_systems_for_current_data_size.indexOf(systems[i]) != -1;
@ -1449,7 +1449,7 @@ function generate_selectors(elem) {
html += "Dataset size: ";
for (var i in data_sizes) {
for (var i = 0; i < data_sizes.length; i++) {
html += "<span class='" + (data_sizes[i].id == current_data_size ? "selected" : "") + "' data-size-id='" + data_sizes[i].id + "'>" + data_sizes[i].name + "</span> ";
}
@ -1458,8 +1458,8 @@ function generate_selectors(elem) {
html += "Run number: ";
for (var i in runs) {
html += "<span class='" + (current_runs.indexOf(i) != -1 ? "selected" : "") + "' data-run-id='" + i + "'>" + runs[i] + "</span> ";
for (var i = 0; i < runs.length; i++) {
html += "<span class='" + (current_runs.indexOf(String(i)) != -1 ? "selected" : "") + "' data-run-id='" + i + "'>" + runs[i] + "</span> ";
}
html += "</p>";
@ -1553,15 +1553,15 @@ function generate_comparison_table() {
html += "<table class='comparison_table'>";
html += "<tr>";
html += "<th></th>";
html += "<th><input id='query_checkbox_toggler' type='checkbox' checked /></th>";
html += "<th style='text-align: left;'>Query</th>";
for (j in filtered_results) {
for (var j = 0; j < filtered_results.length; j++) {
html += "<th colspan='" + current_runs.length + "'>" + filtered_results[j].system +
(filtered_results[j].version ? " (" + filtered_results[j].version + ")" : "") + "</th>";
}
html += "</tr>";
for (i in queries) {
for (var i = 0; i < queries.length; i++) {
html += "<tr>";
html += "<td><input id='query_checkbox" + i + "' type='checkbox' " +
($('#query_checkbox' + i).length == 0 || $('#query_checkbox' + i).is(':checked') ? "checked" : "") + " /></td>";
@ -1571,8 +1571,8 @@ function generate_comparison_table() {
// Вычислим максимальное и минимальное время выполнения по системам, для каждого из трёх прогонов.
var minimums = [0, 0, 0], maximums = [0, 0, 0];
for (j in filtered_results) {
for (current_run_idx in current_runs) {
for (var j = 0; j < filtered_results.length; j++) {
for (var current_run_idx = 0; current_run_idx < current_runs.length; current_run_idx++) {
var k = current_runs[current_run_idx];
var value = filtered_results[j].result[i][k];
@ -1591,12 +1591,12 @@ function generate_comparison_table() {
}
}
for (j in filtered_results) {
for (var j = 0; j < filtered_results.length; j++) {
if (!ratios[j]) {
ratios[j] = [];
}
for (current_run_idx in current_runs) {
for (var current_run_idx = 0; current_run_idx < current_runs.length; current_run_idx++) {
var k = current_runs[current_run_idx];
var value = filtered_results[j].result[i][k];
@ -1616,21 +1616,21 @@ function generate_comparison_table() {
html += "</tr>";
}
if (current_systems.length > 1) {
if (current_systems.length) {
html += "<tr>";
html += "<td rowspan='2'></td>";
html += "<td rowspan='2'><div class='query_cell'>Geometric mean of ratios</div></td>";
for (j in filtered_results) {
for (k in current_runs) {
html += "<th id='totals" + j + "_" + k + "' class='number_cell' style='text-align: center; background-color: #FFF; font-weight: bold;'></th>";
for (var j = 0; j < filtered_results.length; j++) {
for (var k = 0; k < current_runs.length; k++) {
html += "<th id='totals" + j + "_" + current_runs[k] + "' class='number_cell' style='text-align: center; background-color: #FFF; font-weight: bold;'></th>";
}
}
html += "</tr>";
html += "<tr>";
for (j in filtered_results) {
for (var j = 0; j < filtered_results.length; j++) {
html += "<th id='absolute_totals" + j + "' colspan='" + current_runs.length + "' class='number_cell' style='text-align: center; background-color: #FFF; font-weight: bold;'></th>";
}
@ -1641,24 +1641,30 @@ function generate_comparison_table() {
$('#comparison_table').html(html);
for (i in queries) {
for (var i = 0; i < queries.length; i++) {
$('#query_checkbox' + i).click(function() { calculate_totals(); generate_diagram(); } );
}
$('#query_checkbox_toggler').click(function() {
for (var i = 0; i < queries.length; i++) {
var item = $('#query_checkbox' + i);
item.prop("checked", !item.prop("checked"));
}
});
calculate_totals();
}
function calculate_totals() {
if (current_systems.length <= 1) return;
if (!current_systems.length) return;
var filtered_results = results.filter(function(x) {
return x.data_size == current_data_size && current_systems.indexOf(x.system) != -1; });
var total_ratios = [];
for (j in filtered_results) {
for (current_run_idx in current_runs) {
for (var j = 0; j < filtered_results.length; j++) {
for (var current_run_idx = 0; current_run_idx < current_runs.length; current_run_idx++) {
var k = current_runs[current_run_idx];
var current_ratios = ratios[j][k].filter(
@ -1677,7 +1683,7 @@ function calculate_totals() {
}
}
for (j in filtered_results) {
for (var j = 0; j < filtered_results.length; j++) {
var total_ratio = Math.pow(total_ratios[j], 1 / current_runs.length);
$("#absolute_totals" + j).attr("data-ratio", total_ratio).html("x" + total_ratio.toFixed(2));
}
@ -1696,8 +1702,8 @@ function generate_diagram() {
var max_total_ratio = 1;
var min_total_ratio = 0;
for (j in filtered_results) {
for (current_run_idx in current_runs) {
for (var j = 0; j < filtered_results.length; j++) {
for (var current_run_idx = 0; current_run_idx < current_runs.length; current_run_idx++) {
var k = current_runs[current_run_idx];
var ratio = +$("#totals" + j + "_" + k).attr("data-ratio");
@ -1723,7 +1729,7 @@ function generate_diagram() {
html += "<table style='width: 100%'>";
for (j in filtered_results) {
for (var j = 0; j < filtered_results.length; j++) {
var total_ratio = +$("#absolute_totals" + j).attr("data-ratio");
html += "<tr>";
@ -1732,7 +1738,7 @@ function generate_diagram() {
html += "<td style='width: 100%; padding-right: 20px;'>";
for (current_run_idx in current_runs) {
for (var current_run_idx = 0; current_run_idx < current_runs.length; current_run_idx++) {
var k = current_runs[current_run_idx];
var ratio = +$("#totals" + j + "_" + k).attr("data-ratio");
@ -1749,6 +1755,8 @@ function generate_diagram() {
html += "</td>";
//min_total_ratio = 1;
//total_ratio = 1;
html += "<td style='text-align: right; font-weight: bold;'>" + (total_ratio / min_total_ratio).toFixed(2) + "</td>";
html += "</tr>";
}