mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Refined query_log, processes and merges metrics.
Also, dbms/CMakeLists.txt was rewritten.
This commit is contained in:
parent
ca7d0a4742
commit
f8eb9028fd
@ -17,25 +17,80 @@ if (NOT ENABLE_LIBTCMALLOC)
|
||||
add_definitions(-D NO_TCMALLOC)
|
||||
endif ()
|
||||
|
||||
if (APPLE)
|
||||
set (AIO_CPP_FILES "")
|
||||
set (AIO_H_FILES "")
|
||||
set (APPLE_ICONV_LIB iconv)
|
||||
else()
|
||||
set (AIO_H_FILES include/DB/Common/AIO.h
|
||||
include/DB/IO/WriteBufferAIO.h
|
||||
include/DB/IO/ReadBufferAIO.h)
|
||||
set (AIO_CPP_FILES
|
||||
src/IO/ReadBufferAIO.cpp
|
||||
src/IO/WriteBufferAIO.cpp)
|
||||
set (APPLE_ICONV_LIB "")
|
||||
endif()
|
||||
|
||||
add_library (string_utils
|
||||
add_library(string_utils
|
||||
include/DB/Common/StringUtils.h
|
||||
src/Common/StringUtils.cpp)
|
||||
|
||||
add_library (dbms
|
||||
set(AIO_H_FILES
|
||||
include/DB/Common/AIO.h
|
||||
include/DB/IO/WriteBufferAIO.h
|
||||
include/DB/IO/ReadBufferAIO.h)
|
||||
set (AIO_CPP_FILES
|
||||
src/IO/ReadBufferAIO.cpp
|
||||
src/IO/WriteBufferAIO.cpp)
|
||||
set (APPLE_ICONV_LIB)
|
||||
|
||||
if (APPLE)
|
||||
set(APPLE_ICONV_LIB iconv)
|
||||
endif()
|
||||
|
||||
macro(add_glob cur_list)
|
||||
file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${ARGN})
|
||||
list(APPEND ${cur_list} ${__tmp})
|
||||
endmacro()
|
||||
|
||||
macro(add_headers_and_sources prefix common_path)
|
||||
add_glob(${prefix}_headers RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/DB/${common_path}/*.h include/DB/${common_path}/*.inl)
|
||||
add_glob(${prefix}_sources src/${common_path}/*.cpp src/${common_path}/*.h)
|
||||
endmacro()
|
||||
|
||||
macro(add_headers_only prefix common_path)
|
||||
add_glob(${prefix}_headers RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/DB/${common_path}/*.h src/${common_path}/*.h)
|
||||
endmacro()
|
||||
|
||||
set(dbms_headers)
|
||||
set(dbms_sources)
|
||||
add_headers_and_sources(dbms Functions)
|
||||
add_headers_and_sources(dbms Functions/Conditional)
|
||||
add_headers_and_sources(dbms TableFunctions)
|
||||
add_headers_and_sources(dbms Parsers)
|
||||
add_headers_and_sources(dbms AggregateFunctions)
|
||||
add_headers_and_sources(dbms Core)
|
||||
add_headers_and_sources(dbms DataStreams)
|
||||
add_headers_and_sources(dbms DataTypes)
|
||||
add_headers_and_sources(dbms Databases)
|
||||
add_headers_and_sources(dbms DataBases/Distributed)
|
||||
add_headers_and_sources(dbms Dictionaries)
|
||||
add_headers_and_sources(dbms Dictionaries/Embedded)
|
||||
add_headers_and_sources(dbms Interpreters)
|
||||
add_headers_and_sources(dbms Interpreters/ClusterProxy)
|
||||
add_headers_and_sources(dbms Common)
|
||||
add_headers_and_sources(dbms Common/HashTable)
|
||||
add_headers_and_sources(dbms IO)
|
||||
add_headers_and_sources(dbms Columns)
|
||||
add_headers_and_sources(dbms Storages)
|
||||
add_headers_and_sources(dbms Storages/Distributed)
|
||||
add_headers_and_sources(dbms Storages/MergeTree)
|
||||
add_headers_and_sources(dbms Storages/System)
|
||||
add_headers_and_sources(dbms Core)
|
||||
add_headers_and_sources(dbms Client)
|
||||
add_headers_only(dbms Server)
|
||||
|
||||
list(REMOVE_ITEM dbms_sources
|
||||
src/Client/Client.cpp
|
||||
src/Client/Benchmark.cpp
|
||||
src/Storages/StorageCloud.cpp
|
||||
src/Databases/DatabaseCloud.cpp
|
||||
src/Common/StringUtils.cpp)
|
||||
|
||||
if (APPLE)
|
||||
list(REMOVE_ITEM dbms_headers ${AIO_H_FILES})
|
||||
list(REMOVE_ITEM dbms_sources ${AIO_CPP_FILES})
|
||||
endif()
|
||||
|
||||
add_library(dbms ${dbms_headers} ${dbms_sources})
|
||||
|
||||
set (dbms_all_sources_old
|
||||
src/Server/InterserverIOHTTPHandler.h
|
||||
src/Server/Server.h
|
||||
src/Server/TCPHandler.h
|
||||
@ -221,6 +276,7 @@ add_library (dbms
|
||||
include/DB/DataStreams/glueBlockInputStreams.h
|
||||
include/DB/DataStreams/CollapsingSortedBlockInputStream.h
|
||||
include/DB/DataStreams/IBlockOutputStream.h
|
||||
include/DB/DataStreams/InputStreamFromASTInsertQuery.h
|
||||
include/DB/DataStreams/AsynchronousBlockInputStream.h
|
||||
include/DB/DataStreams/MaterializingBlockInputStream.h
|
||||
include/DB/DataStreams/ParallelInputsProcessor.h
|
||||
@ -977,6 +1033,31 @@ add_library (dbms
|
||||
${MONGODB_FILES}
|
||||
)
|
||||
|
||||
######
|
||||
list(REMOVE_DUPLICATES dbms_all_sources_old)
|
||||
list(SORT dbms_all_sources_old)
|
||||
|
||||
set(dbms_all_sources ${dbms_headers} ${dbms_sources})
|
||||
list(REMOVE_DUPLICATES dbms_all_sources)
|
||||
list(SORT dbms_all_sources)
|
||||
|
||||
list(LENGTH dbms_all_sources_old len_old)
|
||||
list(LENGTH dbms_all_sources len_new)
|
||||
set(dbms_all_sources_union ${dbms_all_sources} ${dbms_all_sources_old})
|
||||
list(LENGTH dbms_all_sources_union len_union)
|
||||
list(REMOVE_DUPLICATES dbms_all_sources_union)
|
||||
list(LENGTH dbms_all_sources_union len_union_uniq)
|
||||
message(STATUS "len_old ${len_old}, len_new ${len_new}, len_union ${len_union}, len_union_uniq ${len_union_uniq}")
|
||||
|
||||
set(dbms_all_sources_not_presented ${dbms_all_sources_old})
|
||||
list(REMOVE_ITEM dbms_all_sources_not_presented ${dbms_all_sources})
|
||||
message(STATUS "not presented: ${dbms_all_sources_not_presented}")
|
||||
|
||||
set(dbms_all_sources_extra ${dbms_all_sources})
|
||||
list(REMOVE_ITEM dbms_all_sources_extra ${dbms_all_sources_old})
|
||||
message(STATUS "dbms_all_sources_extra: ${dbms_all_sources_extra}")
|
||||
######
|
||||
|
||||
if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
|
||||
# Won't generate debug info for files with heavy template instantiation to achieve faster linking and lower size.
|
||||
set_source_files_properties(
|
||||
|
@ -24,15 +24,15 @@ struct BlockIO
|
||||
Block in_sample; /// Пример блока, который будет прочитан из in.
|
||||
Block out_sample; /// Пример блока, которого нужно писать в out.
|
||||
|
||||
/// Здесь могут быть установлены колбэки для логгирования запроса.
|
||||
std::function<void(IBlockInputStream *)> finish_callback;
|
||||
std::function<void()> exception_callback;
|
||||
/// Callbacks for query logging could be set here.
|
||||
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
|
||||
std::function<void()> exception_callback;
|
||||
|
||||
/// Вызывайте эти функции, если нужно логгировать запрос.
|
||||
void onFinish()
|
||||
{
|
||||
if (finish_callback)
|
||||
finish_callback(in.get());
|
||||
finish_callback(in.get(), out.get());
|
||||
}
|
||||
|
||||
void onException()
|
||||
|
62
dbms/include/DB/DataStreams/CountingBlockOutputStream.h
Normal file
62
dbms/include/DB/DataStreams/CountingBlockOutputStream.h
Normal file
@ -0,0 +1,62 @@
|
||||
#pragma once
|
||||
#include <DB/DataStreams/IBlockOutputStream.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DB/Interpreters/ProcessList.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
/// Proxy class which counts number of written block, rows, bytes
|
||||
class CountingBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
public:
|
||||
|
||||
CountingBlockOutputStream(const BlockOutputStreamPtr & stream_)
|
||||
: stream(stream_) {}
|
||||
|
||||
void setProgressCallback(ProgressCallback callback)
|
||||
{
|
||||
progress_callback = callback;
|
||||
}
|
||||
|
||||
void setProcessListElement(ProcessListElement * elem)
|
||||
{
|
||||
process_elem = elem;
|
||||
}
|
||||
|
||||
const Progress & getProgress() const
|
||||
{
|
||||
return progress;
|
||||
}
|
||||
|
||||
void write(const Block & block) override
|
||||
{
|
||||
stream->write(block);
|
||||
|
||||
Progress local_progress(block.rowsInFirstColumn(), block.bytes(), 0);
|
||||
progress.incrementPiecewiseAtomically(local_progress);
|
||||
|
||||
if (process_elem)
|
||||
process_elem->updateProgressOut(local_progress);
|
||||
|
||||
if (progress_callback)
|
||||
progress_callback(local_progress);
|
||||
}
|
||||
|
||||
void writePrefix() override { stream->writePrefix(); }
|
||||
void writeSuffix() override { stream->writeSuffix(); }
|
||||
void flush() override { stream->flush(); }
|
||||
void onProgress(const Progress & progress) override { stream->onProgress(progress); }
|
||||
String getContentType() const override { return stream->getContentType(); }
|
||||
|
||||
protected:
|
||||
|
||||
BlockOutputStreamPtr stream;
|
||||
Progress progress;
|
||||
ProgressCallback progress_callback;
|
||||
ProcessListElement * process_elem = nullptr;
|
||||
};
|
||||
|
||||
}
|
83
dbms/include/DB/DataStreams/InputStreamFromASTInsertQuery.h
Normal file
83
dbms/include/DB/DataStreams/InputStreamFromASTInsertQuery.h
Normal file
@ -0,0 +1,83 @@
|
||||
#pragma once
|
||||
#include <DB/Parsers/ASTInsertQuery.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/IO/ConcatReadBuffer.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
class InputStreamFromASTInsertQuery : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
|
||||
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & istr, const BlockIO & streams, Context & context)
|
||||
{
|
||||
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
|
||||
|
||||
if (!ast_insert_query)
|
||||
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
String format = ast_insert_query->format;
|
||||
if (format.empty())
|
||||
format = "Values";
|
||||
|
||||
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (istr) part of query.
|
||||
|
||||
buf1 = std::make_unique<ReadBuffer>(
|
||||
const_cast<char *>(ast_insert_query->data), ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0, 0);
|
||||
|
||||
if (ast_insert_query->data)
|
||||
buffers.push_back(buf1.get());
|
||||
buffers.push_back(&istr);
|
||||
|
||||
/** NOTE Must not read from 'istr' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
|
||||
* - because 'query.data' could refer to memory piece, used as buffer for 'istr'.
|
||||
*/
|
||||
|
||||
data_istr = std::make_unique<ConcatReadBuffer>(buffers);
|
||||
|
||||
res_stream = context.getInputFormat(format, *data_istr, streams.out_sample, context.getSettings().max_insert_block_size);
|
||||
}
|
||||
|
||||
Block readImpl() override
|
||||
{
|
||||
return res_stream->read();
|
||||
}
|
||||
|
||||
void readPrefixImpl() override
|
||||
{
|
||||
return res_stream->readPrefix();
|
||||
}
|
||||
|
||||
void readSuffixImpl() override
|
||||
{
|
||||
return res_stream->readSuffix();
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
return "InputStreamFromASTInsertQuery";
|
||||
}
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
return "InputStreamFromASTInsertQuery(" + toString(this) + ")";
|
||||
}
|
||||
|
||||
private:
|
||||
ConcatReadBuffer::ReadBuffers buffers;
|
||||
std::unique_ptr<ReadBuffer> buf1;
|
||||
std::unique_ptr<ReadBuffer> data_istr;
|
||||
|
||||
BlockInputStreamPtr res_stream;
|
||||
};
|
||||
|
||||
}
|
@ -37,9 +37,11 @@ struct ProcessInfo
|
||||
{
|
||||
String query;
|
||||
double elapsed_seconds;
|
||||
size_t rows;
|
||||
size_t bytes;
|
||||
size_t read_rows;
|
||||
size_t read_bytes;
|
||||
size_t total_rows;
|
||||
size_t written_rows;
|
||||
size_t written_bytes;
|
||||
Int64 memory_usage;
|
||||
ClientInfo client_info;
|
||||
};
|
||||
@ -53,7 +55,10 @@ struct ProcessListElement
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
Progress progress;
|
||||
/// Progress of input stream
|
||||
Progress progress_in;
|
||||
/// Progress of output stream
|
||||
Progress progress_out;
|
||||
|
||||
MemoryTracker memory_tracker;
|
||||
|
||||
@ -88,9 +93,9 @@ struct ProcessListElement
|
||||
current_memory_tracker = nullptr;
|
||||
}
|
||||
|
||||
bool update(const Progress & value)
|
||||
bool updateProgressIn(const Progress & value)
|
||||
{
|
||||
progress.incrementPiecewiseAtomically(value);
|
||||
progress_in.incrementPiecewiseAtomically(value);
|
||||
|
||||
if (priority_handle)
|
||||
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable.
|
||||
@ -98,6 +103,13 @@ struct ProcessListElement
|
||||
return !is_cancelled;
|
||||
}
|
||||
|
||||
bool updateProgressOut(const Progress & value)
|
||||
{
|
||||
progress_out.incrementPiecewiseAtomically(value);
|
||||
return !is_cancelled;
|
||||
}
|
||||
|
||||
|
||||
ProcessInfo getInfo() const
|
||||
{
|
||||
ProcessInfo res;
|
||||
@ -105,9 +117,11 @@ struct ProcessListElement
|
||||
res.query = query;
|
||||
res.client_info = client_info;
|
||||
res.elapsed_seconds = watch.elapsedSeconds();
|
||||
res.rows = progress.rows;
|
||||
res.bytes = progress.bytes;
|
||||
res.total_rows = progress.total_rows;
|
||||
res.read_rows = progress_in.rows;
|
||||
res.read_bytes = progress_in.bytes;
|
||||
res.total_rows = progress_in.total_rows;
|
||||
res.written_rows = progress_out.rows;
|
||||
res.written_bytes = progress_out.bytes;
|
||||
res.memory_usage = memory_tracker.get();
|
||||
|
||||
return res;
|
||||
|
@ -7,14 +7,13 @@ namespace DB
|
||||
{
|
||||
|
||||
|
||||
/** Позволяет логгировать информацию о выполнении запросов:
|
||||
* - о начале выполнения запроса;
|
||||
* - метрики производительности, после выполнения запроса;
|
||||
* - об ошибках при выполнении запроса.
|
||||
/** Allows to log information about queries execution:
|
||||
* - info about start of query execution;
|
||||
* - performance metrics (are set at the end of query execution);
|
||||
* - info about errors of query execution.
|
||||
*/
|
||||
|
||||
/** Что логгировать.
|
||||
*/
|
||||
/// A struct which will be inserted as row into query_log table
|
||||
struct QueryLogElement
|
||||
{
|
||||
enum Type
|
||||
@ -27,7 +26,7 @@ struct QueryLogElement
|
||||
|
||||
Type type = QUERY_START;
|
||||
|
||||
/// В зависимости от типа, не все поля могут быть заполнены.
|
||||
/// Depending on the type of query and type of stage, not all the fields may be filled.
|
||||
|
||||
time_t event_time{};
|
||||
time_t query_start_time{};
|
||||
@ -36,6 +35,11 @@ struct QueryLogElement
|
||||
UInt64 read_rows{};
|
||||
UInt64 read_bytes{};
|
||||
|
||||
UInt64 written_rows{};
|
||||
UInt64 written_bytes{};
|
||||
|
||||
/// NOTE: Not obvious metric.
|
||||
/// It always approximately equal to read_rows or written_rows at the end of query execution.
|
||||
UInt64 result_rows{};
|
||||
UInt64 result_bytes{};
|
||||
|
||||
|
@ -34,20 +34,13 @@ struct MergeInfo
|
||||
std::atomic<UInt64> bytes_read_uncompressed{};
|
||||
std::atomic<UInt64> bytes_written_uncompressed{};
|
||||
|
||||
/// Updated only for Horizontal algorithm
|
||||
/// In case of Vertical algorithm they are actual only for primary key columns
|
||||
std::atomic<UInt64> rows_read{};
|
||||
std::atomic<UInt64> rows_written{};
|
||||
|
||||
/// Updated only for Vertical algorithm
|
||||
/// mutually exclusive with rows_read and rows_written, updated either rows_written either columns_written
|
||||
std::atomic<UInt64> columns_written{};
|
||||
|
||||
/// Updated in both cases
|
||||
/// Number of rows for which primary key columns have been written
|
||||
std::atomic<UInt64> rows_with_key_columns_read{};
|
||||
std::atomic<UInt64> rows_with_key_columns_written{};
|
||||
|
||||
|
||||
MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name)
|
||||
: database{database}, table{table}, result_part_name{result_part_name}
|
||||
{
|
||||
@ -66,9 +59,7 @@ struct MergeInfo
|
||||
bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)),
|
||||
rows_read(other.rows_read.load(std::memory_order_relaxed)),
|
||||
rows_written(other.rows_written.load(std::memory_order_relaxed)),
|
||||
columns_written(other.columns_written.load(std::memory_order_relaxed)),
|
||||
rows_with_key_columns_read(other.rows_with_key_columns_read.load(std::memory_order_relaxed)),
|
||||
rows_with_key_columns_written(other.rows_with_key_columns_written.load(std::memory_order_relaxed))
|
||||
columns_written(other.columns_written.load(std::memory_order_relaxed))
|
||||
{
|
||||
}
|
||||
};
|
||||
|
@ -6,7 +6,6 @@
|
||||
#include <DB/Interpreters/ProcessList.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -228,15 +227,15 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
|
||||
|
||||
if (process_list_elem)
|
||||
{
|
||||
if (!process_list_elem->update(value))
|
||||
if (!process_list_elem->updateProgressIn(value))
|
||||
cancel();
|
||||
|
||||
/// Общее количество данных, обработанных или предполагаемых к обработке во всех листовых источниках, возможно, на удалённых серверах.
|
||||
|
||||
size_t rows_processed = process_list_elem->progress.rows;
|
||||
size_t bytes_processed = process_list_elem->progress.bytes;
|
||||
size_t rows_processed = process_list_elem->progress_in.rows;
|
||||
size_t bytes_processed = process_list_elem->progress_in.bytes;
|
||||
|
||||
size_t total_rows_estimate = std::max(rows_processed, process_list_elem->progress.total_rows.load(std::memory_order_relaxed));
|
||||
size_t total_rows_estimate = std::max(rows_processed, process_list_elem->progress_in.total_rows.load(std::memory_order_relaxed));
|
||||
|
||||
/** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения.
|
||||
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
|
||||
@ -270,7 +269,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
|
||||
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
size_t total_rows = process_list_elem->progress.total_rows;
|
||||
size_t total_rows = process_list_elem->progress_in.total_rows;
|
||||
|
||||
if (limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0))
|
||||
{
|
||||
@ -283,7 +282,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
|
||||
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
|
||||
ErrorCodes::TOO_SLOW);
|
||||
|
||||
size_t total_rows = process_list_elem->progress.total_rows;
|
||||
size_t total_rows = process_list_elem->progress_in.total_rows;
|
||||
|
||||
/// Если предсказанное время выполнения больше, чем max_execution_time.
|
||||
if (limits.max_execution_time != 0 && total_rows)
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <DB/DataStreams/PushingToViewsBlockOutputStream.h>
|
||||
#include <DB/DataStreams/NullAndDoCopyBlockInputStream.h>
|
||||
#include <DB/DataStreams/SquashingBlockOutputStream.h>
|
||||
#include <DB/DataStreams/CountingBlockOutputStream.h>
|
||||
#include <DB/DataStreams/copyData.h>
|
||||
|
||||
#include <DB/Parsers/ASTInsertQuery.h>
|
||||
@ -101,6 +102,10 @@ BlockIO InterpreterInsertQuery::execute()
|
||||
context.getSettingsRef().min_insert_block_size_rows,
|
||||
context.getSettingsRef().min_insert_block_size_bytes);
|
||||
|
||||
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
|
||||
out_wrapper->setProcessListElement(context.getProcessListElement());
|
||||
out = std::move(out_wrapper);
|
||||
|
||||
BlockIO res;
|
||||
res.out_sample = getSampleBlock();
|
||||
|
||||
|
@ -28,6 +28,9 @@ Block QueryLogElement::createBlock()
|
||||
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "read_rows"},
|
||||
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "read_bytes"},
|
||||
|
||||
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "written_rows"},
|
||||
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "written_bytes"},
|
||||
|
||||
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "result_rows"},
|
||||
{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "result_bytes"},
|
||||
|
||||
@ -102,6 +105,9 @@ void QueryLogElement::appendToBlock(Block & block) const
|
||||
block.unsafeGetByPosition(i++).column->insert(UInt64(read_rows));
|
||||
block.unsafeGetByPosition(i++).column->insert(UInt64(read_bytes));
|
||||
|
||||
block.unsafeGetByPosition(i++).column->insert(UInt64(written_rows));
|
||||
block.unsafeGetByPosition(i++).column->insert(UInt64(written_bytes));
|
||||
|
||||
block.unsafeGetByPosition(i++).column->insert(UInt64(result_rows));
|
||||
block.unsafeGetByPosition(i++).column->insert(UInt64(result_bytes));
|
||||
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include <DB/DataStreams/BlockIO.h>
|
||||
#include <DB/DataStreams/copyData.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DB/DataStreams/InputStreamFromASTInsertQuery.h>
|
||||
#include <DB/DataStreams/CountingBlockOutputStream.h>
|
||||
|
||||
#include <DB/Parsers/ASTInsertQuery.h>
|
||||
#include <DB/Parsers/ASTShowProcesslistQuery.h>
|
||||
@ -191,13 +193,23 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
|
||||
if (res.in)
|
||||
{
|
||||
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(res.in.get()))
|
||||
if (auto stream = dynamic_cast<IProfilingBlockInputStream *>(res.in.get()))
|
||||
{
|
||||
stream->setProgressCallback(context.getProgressCallback());
|
||||
stream->setProcessListElement(context.getProcessListElement());
|
||||
}
|
||||
}
|
||||
|
||||
if (res.out)
|
||||
{
|
||||
if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get()))
|
||||
{
|
||||
stream->setProcessListElement(context.getProcessListElement());
|
||||
}
|
||||
}
|
||||
if (!res.out || !dynamic_cast<CountingBlockOutputStream *>(res.out.get()))
|
||||
LOG_DEBUG(&Logger::get("executeQuery"), "res.out " << res.out.get() << " is empty");
|
||||
|
||||
/// Everything related to query log.
|
||||
{
|
||||
QueryLogElement elem;
|
||||
@ -218,7 +230,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
context.getQueryLog().add(elem);
|
||||
|
||||
/// Also make possible for caller to log successful query finish and exception during execution.
|
||||
res.finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream) mutable
|
||||
res.finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable
|
||||
{
|
||||
ProcessListElement * process_list_elem = context.getProcessListElement();
|
||||
|
||||
@ -232,22 +244,35 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
elem.event_time = time(0);
|
||||
elem.query_duration_ms = elapsed_seconds * 1000;
|
||||
|
||||
elem.read_rows = process_list_elem->progress.rows;
|
||||
elem.read_bytes = process_list_elem->progress.bytes;
|
||||
elem.read_rows = process_list_elem->progress_in.rows;
|
||||
elem.read_bytes = process_list_elem->progress_in.bytes;
|
||||
|
||||
elem.written_rows = process_list_elem->progress_out.rows;
|
||||
elem.written_bytes = process_list_elem->progress_out.bytes;
|
||||
|
||||
auto memory_usage = process_list_elem->memory_tracker.getPeak();
|
||||
elem.memory_usage = memory_usage > 0 ? memory_usage : 0;
|
||||
|
||||
if (stream)
|
||||
if (stream_in)
|
||||
{
|
||||
if (IProfilingBlockInputStream * profiling_stream = dynamic_cast<IProfilingBlockInputStream *>(stream))
|
||||
if (auto profiling_stream = dynamic_cast<const IProfilingBlockInputStream *>(stream_in))
|
||||
{
|
||||
const BlockStreamProfileInfo & info = profiling_stream->getProfileInfo();
|
||||
|
||||
/// NOTE: INSERT SELECT query contains zero metrics
|
||||
elem.result_rows = info.rows;
|
||||
elem.result_bytes = info.bytes;
|
||||
}
|
||||
}
|
||||
else if (stream_out) /// will be used only for ordinary INSERT queries
|
||||
{
|
||||
if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out))
|
||||
{
|
||||
/// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.
|
||||
elem.result_rows = counting_stream->getProgress().rows;
|
||||
elem.result_bytes = counting_stream->getProgress().bytes;
|
||||
}
|
||||
}
|
||||
|
||||
if (elem.read_rows != 0)
|
||||
{
|
||||
@ -280,8 +305,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
|
||||
elem.query_duration_ms = elapsed_seconds * 1000;
|
||||
|
||||
elem.read_rows = process_list_elem->progress.rows;
|
||||
elem.read_bytes = process_list_elem->progress.bytes;
|
||||
elem.read_rows = process_list_elem->progress_in.rows;
|
||||
elem.read_bytes = process_list_elem->progress_in.bytes;
|
||||
|
||||
auto memory_usage = process_list_elem->memory_tracker.getPeak();
|
||||
elem.memory_usage = memory_usage > 0 ? memory_usage : 0;
|
||||
@ -369,35 +394,8 @@ void executeQuery(
|
||||
{
|
||||
if (streams.out)
|
||||
{
|
||||
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
|
||||
|
||||
if (!ast_insert_query)
|
||||
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
String format = ast_insert_query->format;
|
||||
if (format.empty())
|
||||
format = "Values";
|
||||
|
||||
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (istr) part of query.
|
||||
|
||||
ConcatReadBuffer::ReadBuffers buffers;
|
||||
ReadBuffer buf1(const_cast<char *>(ast_insert_query->data), ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0, 0);
|
||||
|
||||
if (ast_insert_query->data)
|
||||
buffers.push_back(&buf1);
|
||||
buffers.push_back(&istr);
|
||||
|
||||
/** NOTE Must not read from 'istr' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
|
||||
* - because 'query.data' could refer to memory piece, used as buffer for 'istr'.
|
||||
*/
|
||||
|
||||
ConcatReadBuffer data_istr(buffers);
|
||||
|
||||
BlockInputStreamPtr in{
|
||||
context.getInputFormat(
|
||||
format, data_istr, streams.out_sample, context.getSettings().max_insert_block_size)};
|
||||
|
||||
copyData(*in, *streams.out);
|
||||
InputStreamFromASTInsertQuery in(ast, istr, streams, context);
|
||||
copyData(in, *streams.out);
|
||||
}
|
||||
|
||||
if (streams.in)
|
||||
@ -410,7 +408,7 @@ void executeQuery(
|
||||
|
||||
BlockOutputStreamPtr out = context.getOutputFormat(format_name, ostr, streams.in_sample);
|
||||
|
||||
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(streams.in.get()))
|
||||
if (auto stream = dynamic_cast<IProfilingBlockInputStream *>(streams.in.get()))
|
||||
{
|
||||
/// NOTE Progress callback takes shared ownership of 'out'.
|
||||
stream->setProgressCallback([out] (const Progress & progress) { out->onProgress(progress); });
|
||||
|
@ -392,10 +392,9 @@ public:
|
||||
const ColumnSizeEstimator & column_sizes)
|
||||
: merge_entry(merge_entry_), merge_alg(merge_alg_)
|
||||
{
|
||||
if (merge_alg == MergeAlgorithm::Horizontal)
|
||||
average_elem_progress = 1.0 / num_total_rows;
|
||||
else
|
||||
average_elem_progress = column_sizes.keyColumnsProgress(1, num_total_rows);
|
||||
average_elem_progress = (merge_alg == MergeAlgorithm::Horizontal)
|
||||
? 1.0 / num_total_rows
|
||||
: column_sizes.keyColumnsProgress(1, num_total_rows);
|
||||
}
|
||||
|
||||
MergeList::Entry & merge_entry;
|
||||
@ -405,19 +404,11 @@ public:
|
||||
void operator() (const Progress & value)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
|
||||
merge_entry->bytes_read_uncompressed += value.bytes;
|
||||
merge_entry->rows_with_key_columns_read += value.rows;
|
||||
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
|
||||
|
||||
if (merge_alg == MergeAlgorithm::Horizontal)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
|
||||
merge_entry->rows_read += value.rows;
|
||||
merge_entry->progress = average_elem_progress * merge_entry->rows_read;
|
||||
}
|
||||
else
|
||||
{
|
||||
merge_entry->progress = average_elem_progress * merge_entry->rows_with_key_columns_read;
|
||||
}
|
||||
merge_entry->bytes_read_uncompressed += value.bytes;
|
||||
merge_entry->rows_read += value.rows;
|
||||
merge_entry->progress = average_elem_progress * merge_entry->rows_read;
|
||||
};
|
||||
};
|
||||
|
||||
@ -597,9 +588,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
|
||||
rows_written += block.rows();
|
||||
to.write(block);
|
||||
|
||||
if (merge_alg == MergeAlgorithm::Horizontal)
|
||||
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
|
||||
merge_entry->rows_with_key_columns_written = merged_stream->getProfileInfo().rows;
|
||||
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
|
||||
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
|
||||
|
||||
/// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates
|
||||
@ -619,7 +608,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
|
||||
/// Gather ordinary columns
|
||||
if (merge_alg == MergeAlgorithm::Vertical)
|
||||
{
|
||||
size_t sum_input_rows_exact = merge_entry->rows_with_key_columns_read;
|
||||
size_t sum_input_rows_exact = merge_entry->rows_read;
|
||||
merge_entry->columns_written = merging_column_names.size();
|
||||
merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact);
|
||||
|
||||
|
@ -25,9 +25,7 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name)
|
||||
{ "rows_read", std::make_shared<DataTypeUInt64>() },
|
||||
{ "bytes_written_uncompressed", std::make_shared<DataTypeUInt64>() },
|
||||
{ "rows_written", std::make_shared<DataTypeUInt64>() },
|
||||
{ "columns_written", std::make_shared<DataTypeUInt64>() },
|
||||
{ "rows_with_key_columns_read", std::make_shared<DataTypeUInt64>() },
|
||||
{ "rows_with_key_columns_written", std::make_shared<DataTypeUInt64>() }
|
||||
{ "columns_written", std::make_shared<DataTypeUInt64>() }
|
||||
}
|
||||
{
|
||||
}
|
||||
@ -62,8 +60,6 @@ BlockInputStreams StorageSystemMerges::read(
|
||||
ColumnWithTypeAndName col_bytes_written_uncompressed{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "bytes_written_uncompressed"};
|
||||
ColumnWithTypeAndName col_rows_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_written"};
|
||||
ColumnWithTypeAndName col_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "columns_written"};
|
||||
ColumnWithTypeAndName col_rows_with_key_columns_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_read"};
|
||||
ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_written"};
|
||||
|
||||
for (const auto & merge : context.getMergeList().get())
|
||||
{
|
||||
@ -80,8 +76,6 @@ BlockInputStreams StorageSystemMerges::read(
|
||||
col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed));
|
||||
col_rows_written.column->insert(merge.rows_written.load(std::memory_order_relaxed));
|
||||
col_columns_written.column->insert(merge.columns_written.load(std::memory_order_relaxed));
|
||||
col_rows_with_key_columns_read.column->insert(merge.rows_with_key_columns_read.load(std::memory_order_relaxed));
|
||||
col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed));
|
||||
}
|
||||
|
||||
Block block{
|
||||
@ -97,9 +91,7 @@ BlockInputStreams StorageSystemMerges::read(
|
||||
col_rows_read,
|
||||
col_bytes_written_uncompressed,
|
||||
col_rows_written,
|
||||
col_columns_written,
|
||||
col_rows_with_key_columns_read,
|
||||
col_rows_with_key_columns_written
|
||||
col_columns_written
|
||||
};
|
||||
|
||||
return BlockInputStreams{1, std::make_shared<OneBlockInputStream>(block)};
|
||||
|
@ -41,11 +41,13 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_)
|
||||
{ "quota_key", std::make_shared<DataTypeString>() },
|
||||
|
||||
{ "elapsed", std::make_shared<DataTypeFloat64>() },
|
||||
{ "rows_read", std::make_shared<DataTypeUInt64>() },
|
||||
{ "bytes_read", std::make_shared<DataTypeUInt64>() },
|
||||
{ "total_rows_approx", std::make_shared<DataTypeUInt64>() },
|
||||
{ "read_rows", std::make_shared<DataTypeUInt64>() },
|
||||
{ "read_bytes", std::make_shared<DataTypeUInt64>() },
|
||||
{ "total_rows_approx", std::make_shared<DataTypeUInt64>() },
|
||||
{ "written_rows", std::make_shared<DataTypeUInt64>() },
|
||||
{ "written_bytes", std::make_shared<DataTypeUInt64>() },
|
||||
{ "memory_usage", std::make_shared<DataTypeInt64>() },
|
||||
{ "query", std::make_shared<DataTypeString>() },
|
||||
{ "query", std::make_shared<DataTypeString>() }
|
||||
}
|
||||
{
|
||||
}
|
||||
@ -95,9 +97,11 @@ BlockInputStreams StorageSystemProcesses::read(
|
||||
block.unsafeGetByPosition(i++).column->insert(process.client_info.http_user_agent);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.client_info.quota_key);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.elapsed_seconds);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.rows);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.bytes);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.read_rows);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.read_bytes);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.total_rows);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.written_rows);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.written_bytes);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.memory_usage);
|
||||
block.unsafeGetByPosition(i++).column->insert(process.query);
|
||||
}
|
||||
|
@ -30,17 +30,17 @@ MSG_SKIPPED = OP_SQUARE_BRACKET + colored(" SKIPPED ", "cyan", attrs=['bold']) +
|
||||
def main(args):
|
||||
|
||||
SERVER_DIED = False
|
||||
|
||||
|
||||
|
||||
|
||||
def is_data_present():
|
||||
proc = Popen(args.client, stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
||||
(stdout, stderr) = proc.communicate("EXISTS TABLE test.hits")
|
||||
if proc.returncode != 0:
|
||||
raise CalledProcessError(proc.returncode, args.client, stderr)
|
||||
|
||||
|
||||
return stdout.startswith('1')
|
||||
|
||||
|
||||
|
||||
|
||||
def dump_report(destination, suite, test_case, report):
|
||||
if destination is not None:
|
||||
destination_file = os.path.join(destination, suite, test_case + ".xml")
|
||||
@ -53,23 +53,23 @@ def main(args):
|
||||
report_suite.append(report)
|
||||
report_root.append(report_suite)
|
||||
report_file.write(et.tostring(report_root, encoding = "UTF-8", xml_declaration=True, pretty_print=True))
|
||||
|
||||
|
||||
|
||||
|
||||
if args.zookeeper is None:
|
||||
try:
|
||||
check_call(['grep', '-q', '<zookeeper', '/etc/clickhouse-server/config-preprocessed.xml'], )
|
||||
args.zookeeper = True
|
||||
except CalledProcessError:
|
||||
args.zookeeper = False
|
||||
|
||||
|
||||
base_dir = os.path.abspath(args.queries)
|
||||
|
||||
|
||||
failures_total = 0
|
||||
|
||||
|
||||
for suite in sorted(os.listdir(base_dir)):
|
||||
if SERVER_DIED:
|
||||
break
|
||||
|
||||
|
||||
suite_dir = os.path.join(base_dir, suite)
|
||||
suite_re_obj = re.search('^[0-9]+_(.*)$', suite)
|
||||
if not suite_re_obj: #skip .gitignore and so on
|
||||
@ -77,7 +77,7 @@ def main(args):
|
||||
suite = suite_re_obj.group(1)
|
||||
if os.path.isdir(suite_dir):
|
||||
print("\nRunning {} tests.\n".format(suite))
|
||||
|
||||
|
||||
failures = 0
|
||||
if 'stateful' in suite and not is_data_present():
|
||||
print("Won't run stateful tests because test data wasn't loaded. See README.txt.")
|
||||
@ -86,15 +86,15 @@ def main(args):
|
||||
for case in sorted(filter(lambda case: re.search(args.test, case) if args.test else True, os.listdir(suite_dir))):
|
||||
if SERVER_DIED:
|
||||
break
|
||||
|
||||
|
||||
case_file = os.path.join(suite_dir, case)
|
||||
if os.path.isfile(case_file) and (case.endswith('.sh') or case.endswith('.sql')):
|
||||
(name, ext) = os.path.splitext(case)
|
||||
report_testcase = et.Element("testcase", attrib = {"name": name})
|
||||
|
||||
|
||||
print "{0:70}".format(name + ": "),
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
if not args.zookeeper and 'zookeeper' in name:
|
||||
report_testcase.append(et.Element("skipped", attrib = {"message": "no zookeeper"}))
|
||||
print(MSG_SKIPPED + " - no zookeeper")
|
||||
@ -102,27 +102,27 @@ def main(args):
|
||||
reference_file = os.path.join(suite_dir, name) + '.reference'
|
||||
stdout_file = os.path.join(suite_dir, name) + '.stdout'
|
||||
stderr_file = os.path.join(suite_dir, name) + '.stderr'
|
||||
|
||||
|
||||
if ext == '.sql':
|
||||
command = "{0} --multiquery < {1} > {2} 2> {3}".format(args.client, case_file, stdout_file, stderr_file)
|
||||
else:
|
||||
command = "{0} > {1} 2> {2}".format(case_file, stdout_file, stderr_file)
|
||||
|
||||
|
||||
proc = Popen(command, shell = True)
|
||||
start_time = datetime.now()
|
||||
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
|
||||
sleep(0)
|
||||
|
||||
|
||||
if proc.returncode is None:
|
||||
try:
|
||||
proc.kill()
|
||||
except OSError as e:
|
||||
if e.errno != ESRCH:
|
||||
raise
|
||||
|
||||
|
||||
failure = et.Element("failure", attrib = {"message": "Timeout"})
|
||||
report_testcase.append(failure)
|
||||
|
||||
|
||||
failures = failures + 1
|
||||
print("{0} - Timeout!".format(MSG_FAIL))
|
||||
else:
|
||||
@ -130,62 +130,62 @@ def main(args):
|
||||
stdout = unicode(stdout, errors='replace', encoding='utf-8')
|
||||
stderr = open(stderr_file, 'r').read() if os.path.exists(stderr_file) else ''
|
||||
stderr = unicode(stderr, errors='replace', encoding='utf-8')
|
||||
|
||||
|
||||
if proc.returncode != 0:
|
||||
failure = et.Element("failure", attrib = {"message": "return code {}".format(proc.returncode)})
|
||||
report_testcase.append(failure)
|
||||
|
||||
|
||||
stdout_element = et.Element("system-out")
|
||||
stdout_element.text = et.CDATA(stdout)
|
||||
report_testcase.append(stdout_element)
|
||||
|
||||
|
||||
failures = failures + 1
|
||||
print("{0} - return code {1}".format(MSG_FAIL, proc.returncode))
|
||||
|
||||
|
||||
if stderr:
|
||||
stderr_element = et.Element("system-err")
|
||||
stderr_element.text = et.CDATA(stderr)
|
||||
report_testcase.append(stderr_element)
|
||||
print(stderr)
|
||||
|
||||
|
||||
if 'Connection refused' in stderr or 'Attempt to read after eof' in stderr:
|
||||
SERVER_DIED = True
|
||||
|
||||
|
||||
elif stderr:
|
||||
failure = et.Element("failure", attrib = {"message": "having stderror"})
|
||||
report_testcase.append(failure)
|
||||
|
||||
|
||||
stderr_element = et.Element("system-err")
|
||||
stderr_element.text = et.CDATA(stderr)
|
||||
report_testcase.append(stderr_element)
|
||||
|
||||
|
||||
failures = failures + 1
|
||||
print("{0} - having stderror:\n{1}".format(MSG_FAIL, stderr))
|
||||
print("{0} - having stderror:\n{1}".format(MSG_FAIL, stderr.encode('utf-8')))
|
||||
elif 'Exception' in stdout:
|
||||
failure = et.Element("error", attrib = {"message": "having exception"})
|
||||
report_testcase.append(failure)
|
||||
|
||||
|
||||
stdout_element = et.Element("system-out")
|
||||
stdout_element.text = et.CDATA(stdout)
|
||||
report_testcase.append(stdout_element)
|
||||
|
||||
|
||||
failures = failures + 1
|
||||
print("{0} - having exception:\n{1}".format(MSG_FAIL, stdout))
|
||||
print("{0} - having exception:\n{1}".format(MSG_FAIL, stdout.encode('utf-8')))
|
||||
elif not os.path.isfile(reference_file):
|
||||
skipped = et.Element("skipped", attrib = {"message": "no reference file"})
|
||||
report_testcase.append(skipped)
|
||||
print("{0} - no reference file".format(MSG_UNKNOWN))
|
||||
else:
|
||||
(diff, _) = Popen(['diff', reference_file, stdout_file], stdout = PIPE).communicate()
|
||||
|
||||
|
||||
if diff:
|
||||
failure = et.Element("failure", attrib = {"message": "result differs with reference"})
|
||||
report_testcase.append(failure)
|
||||
|
||||
|
||||
stdout_element = et.Element("system-out")
|
||||
stdout_element.text = et.CDATA(diff)
|
||||
report_testcase.append(stdout_element)
|
||||
|
||||
|
||||
failures = failures + 1
|
||||
print("{0} - result differs with reference:\n{1}".format(MSG_FAIL, diff))
|
||||
else:
|
||||
@ -194,18 +194,18 @@ def main(args):
|
||||
os.remove(stdout_file)
|
||||
if os.path.exists(stderr_file):
|
||||
os.remove(stderr_file)
|
||||
|
||||
|
||||
dump_report(args.output, suite, name, report_testcase)
|
||||
|
||||
|
||||
failures_total = failures_total + failures
|
||||
|
||||
|
||||
if failures_total > 0:
|
||||
print(colored("\nHaving {0} errors!".format(failures_total), "red", attrs=["bold"]))
|
||||
sys.exit(1)
|
||||
else:
|
||||
print(colored("\nAll tests passed.", "green", attrs=["bold"]))
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = ArgumentParser(description = 'ClickHouse functional tests')
|
||||
@ -214,11 +214,11 @@ if __name__ == '__main__':
|
||||
parser.add_argument('-o', '--output', help = 'Output xUnit compliant test report directory')
|
||||
parser.add_argument('-t', '--timeout', type = int, default = 600, help = 'Timeout for each test case in seconds')
|
||||
parser.add_argument('test', nargs = '?', help = 'Optional test case name regex')
|
||||
|
||||
|
||||
group = parser.add_mutually_exclusive_group(required = False)
|
||||
group.add_argument('--zookeeper', action = 'store_true', default = None, dest = 'zookeeper', help = 'Run zookeeper related tests')
|
||||
group.add_argument('--no-zookeeper', action = 'store_false', default = None, dest = 'zookeeper', help = 'Do not run zookeeper related tests')
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
main(args)
|
||||
|
Loading…
Reference in New Issue
Block a user