diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index d4ca884e067..10985e8ca3b 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -17,25 +17,80 @@ if (NOT ENABLE_LIBTCMALLOC) add_definitions(-D NO_TCMALLOC) endif () -if (APPLE) - set (AIO_CPP_FILES "") - set (AIO_H_FILES "") - set (APPLE_ICONV_LIB iconv) -else() - set (AIO_H_FILES include/DB/Common/AIO.h - include/DB/IO/WriteBufferAIO.h - include/DB/IO/ReadBufferAIO.h) - set (AIO_CPP_FILES - src/IO/ReadBufferAIO.cpp - src/IO/WriteBufferAIO.cpp) - set (APPLE_ICONV_LIB "") -endif() - -add_library (string_utils +add_library(string_utils include/DB/Common/StringUtils.h src/Common/StringUtils.cpp) -add_library (dbms +set(AIO_H_FILES + include/DB/Common/AIO.h + include/DB/IO/WriteBufferAIO.h + include/DB/IO/ReadBufferAIO.h) +set (AIO_CPP_FILES + src/IO/ReadBufferAIO.cpp + src/IO/WriteBufferAIO.cpp) +set (APPLE_ICONV_LIB) + +if (APPLE) + set(APPLE_ICONV_LIB iconv) +endif() + +macro(add_glob cur_list) + file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${ARGN}) + list(APPEND ${cur_list} ${__tmp}) +endmacro() + +macro(add_headers_and_sources prefix common_path) + add_glob(${prefix}_headers RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/DB/${common_path}/*.h include/DB/${common_path}/*.inl) + add_glob(${prefix}_sources src/${common_path}/*.cpp src/${common_path}/*.h) +endmacro() + +macro(add_headers_only prefix common_path) + add_glob(${prefix}_headers RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/DB/${common_path}/*.h src/${common_path}/*.h) +endmacro() + +set(dbms_headers) +set(dbms_sources) +add_headers_and_sources(dbms Functions) +add_headers_and_sources(dbms Functions/Conditional) +add_headers_and_sources(dbms TableFunctions) +add_headers_and_sources(dbms Parsers) +add_headers_and_sources(dbms AggregateFunctions) +add_headers_and_sources(dbms Core) +add_headers_and_sources(dbms DataStreams) +add_headers_and_sources(dbms DataTypes) +add_headers_and_sources(dbms Databases) +add_headers_and_sources(dbms DataBases/Distributed) +add_headers_and_sources(dbms Dictionaries) +add_headers_and_sources(dbms Dictionaries/Embedded) +add_headers_and_sources(dbms Interpreters) +add_headers_and_sources(dbms Interpreters/ClusterProxy) +add_headers_and_sources(dbms Common) +add_headers_and_sources(dbms Common/HashTable) +add_headers_and_sources(dbms IO) +add_headers_and_sources(dbms Columns) +add_headers_and_sources(dbms Storages) +add_headers_and_sources(dbms Storages/Distributed) +add_headers_and_sources(dbms Storages/MergeTree) +add_headers_and_sources(dbms Storages/System) +add_headers_and_sources(dbms Core) +add_headers_and_sources(dbms Client) +add_headers_only(dbms Server) + +list(REMOVE_ITEM dbms_sources + src/Client/Client.cpp + src/Client/Benchmark.cpp + src/Storages/StorageCloud.cpp + src/Databases/DatabaseCloud.cpp + src/Common/StringUtils.cpp) + +if (APPLE) + list(REMOVE_ITEM dbms_headers ${AIO_H_FILES}) + list(REMOVE_ITEM dbms_sources ${AIO_CPP_FILES}) +endif() + +add_library(dbms ${dbms_headers} ${dbms_sources}) + +set (dbms_all_sources_old src/Server/InterserverIOHTTPHandler.h src/Server/Server.h src/Server/TCPHandler.h @@ -221,6 +276,7 @@ add_library (dbms include/DB/DataStreams/glueBlockInputStreams.h include/DB/DataStreams/CollapsingSortedBlockInputStream.h include/DB/DataStreams/IBlockOutputStream.h + include/DB/DataStreams/InputStreamFromASTInsertQuery.h include/DB/DataStreams/AsynchronousBlockInputStream.h include/DB/DataStreams/MaterializingBlockInputStream.h include/DB/DataStreams/ParallelInputsProcessor.h @@ -977,6 +1033,31 @@ add_library (dbms ${MONGODB_FILES} ) +###### +list(REMOVE_DUPLICATES dbms_all_sources_old) +list(SORT dbms_all_sources_old) + +set(dbms_all_sources ${dbms_headers} ${dbms_sources}) +list(REMOVE_DUPLICATES dbms_all_sources) +list(SORT dbms_all_sources) + +list(LENGTH dbms_all_sources_old len_old) +list(LENGTH dbms_all_sources len_new) +set(dbms_all_sources_union ${dbms_all_sources} ${dbms_all_sources_old}) +list(LENGTH dbms_all_sources_union len_union) +list(REMOVE_DUPLICATES dbms_all_sources_union) +list(LENGTH dbms_all_sources_union len_union_uniq) +message(STATUS "len_old ${len_old}, len_new ${len_new}, len_union ${len_union}, len_union_uniq ${len_union_uniq}") + +set(dbms_all_sources_not_presented ${dbms_all_sources_old}) +list(REMOVE_ITEM dbms_all_sources_not_presented ${dbms_all_sources}) +message(STATUS "not presented: ${dbms_all_sources_not_presented}") + +set(dbms_all_sources_extra ${dbms_all_sources}) +list(REMOVE_ITEM dbms_all_sources_extra ${dbms_all_sources_old}) +message(STATUS "dbms_all_sources_extra: ${dbms_all_sources_extra}") +###### + if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Won't generate debug info for files with heavy template instantiation to achieve faster linking and lower size. set_source_files_properties( diff --git a/dbms/include/DB/DataStreams/BlockIO.h b/dbms/include/DB/DataStreams/BlockIO.h index 8169b3614fb..7f81eba193e 100644 --- a/dbms/include/DB/DataStreams/BlockIO.h +++ b/dbms/include/DB/DataStreams/BlockIO.h @@ -24,15 +24,15 @@ struct BlockIO Block in_sample; /// Пример блока, который будет прочитан из in. Block out_sample; /// Пример блока, которого нужно писать в out. - /// Здесь могут быть установлены колбэки для логгирования запроса. - std::function finish_callback; - std::function exception_callback; + /// Callbacks for query logging could be set here. + std::function finish_callback; + std::function exception_callback; /// Вызывайте эти функции, если нужно логгировать запрос. void onFinish() { if (finish_callback) - finish_callback(in.get()); + finish_callback(in.get(), out.get()); } void onException() diff --git a/dbms/include/DB/DataStreams/CountingBlockOutputStream.h b/dbms/include/DB/DataStreams/CountingBlockOutputStream.h new file mode 100644 index 00000000000..8694e18c05d --- /dev/null +++ b/dbms/include/DB/DataStreams/CountingBlockOutputStream.h @@ -0,0 +1,62 @@ +#pragma once +#include +#include +#include + + +namespace DB +{ + + +/// Proxy class which counts number of written block, rows, bytes +class CountingBlockOutputStream : public IBlockOutputStream +{ +public: + + CountingBlockOutputStream(const BlockOutputStreamPtr & stream_) + : stream(stream_) {} + + void setProgressCallback(ProgressCallback callback) + { + progress_callback = callback; + } + + void setProcessListElement(ProcessListElement * elem) + { + process_elem = elem; + } + + const Progress & getProgress() const + { + return progress; + } + + void write(const Block & block) override + { + stream->write(block); + + Progress local_progress(block.rowsInFirstColumn(), block.bytes(), 0); + progress.incrementPiecewiseAtomically(local_progress); + + if (process_elem) + process_elem->updateProgressOut(local_progress); + + if (progress_callback) + progress_callback(local_progress); + } + + void writePrefix() override { stream->writePrefix(); } + void writeSuffix() override { stream->writeSuffix(); } + void flush() override { stream->flush(); } + void onProgress(const Progress & progress) override { stream->onProgress(progress); } + String getContentType() const override { return stream->getContentType(); } + +protected: + + BlockOutputStreamPtr stream; + Progress progress; + ProgressCallback progress_callback; + ProcessListElement * process_elem = nullptr; +}; + +} diff --git a/dbms/include/DB/DataStreams/InputStreamFromASTInsertQuery.h b/dbms/include/DB/DataStreams/InputStreamFromASTInsertQuery.h new file mode 100644 index 00000000000..28ccf8db1c8 --- /dev/null +++ b/dbms/include/DB/DataStreams/InputStreamFromASTInsertQuery.h @@ -0,0 +1,83 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + + +class InputStreamFromASTInsertQuery : public IProfilingBlockInputStream +{ +public: + + InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & istr, const BlockIO & streams, Context & context) + { + const ASTInsertQuery * ast_insert_query = dynamic_cast(ast.get()); + + if (!ast_insert_query) + throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR); + + String format = ast_insert_query->format; + if (format.empty()) + format = "Values"; + + /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (istr) part of query. + + buf1 = std::make_unique( + const_cast(ast_insert_query->data), ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0, 0); + + if (ast_insert_query->data) + buffers.push_back(buf1.get()); + buffers.push_back(&istr); + + /** NOTE Must not read from 'istr' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'. + * - because 'query.data' could refer to memory piece, used as buffer for 'istr'. + */ + + data_istr = std::make_unique(buffers); + + res_stream = context.getInputFormat(format, *data_istr, streams.out_sample, context.getSettings().max_insert_block_size); + } + + Block readImpl() override + { + return res_stream->read(); + } + + void readPrefixImpl() override + { + return res_stream->readPrefix(); + } + + void readSuffixImpl() override + { + return res_stream->readSuffix(); + } + + String getName() const override + { + return "InputStreamFromASTInsertQuery"; + } + + String getID() const override + { + return "InputStreamFromASTInsertQuery(" + toString(this) + ")"; + } + +private: + ConcatReadBuffer::ReadBuffers buffers; + std::unique_ptr buf1; + std::unique_ptr data_istr; + + BlockInputStreamPtr res_stream; +}; + +} diff --git a/dbms/include/DB/Interpreters/ProcessList.h b/dbms/include/DB/Interpreters/ProcessList.h index 7bd010cb2f0..02e84c8f4b5 100644 --- a/dbms/include/DB/Interpreters/ProcessList.h +++ b/dbms/include/DB/Interpreters/ProcessList.h @@ -37,9 +37,11 @@ struct ProcessInfo { String query; double elapsed_seconds; - size_t rows; - size_t bytes; + size_t read_rows; + size_t read_bytes; size_t total_rows; + size_t written_rows; + size_t written_bytes; Int64 memory_usage; ClientInfo client_info; }; @@ -53,7 +55,10 @@ struct ProcessListElement Stopwatch watch; - Progress progress; + /// Progress of input stream + Progress progress_in; + /// Progress of output stream + Progress progress_out; MemoryTracker memory_tracker; @@ -88,9 +93,9 @@ struct ProcessListElement current_memory_tracker = nullptr; } - bool update(const Progress & value) + bool updateProgressIn(const Progress & value) { - progress.incrementPiecewiseAtomically(value); + progress_in.incrementPiecewiseAtomically(value); if (priority_handle) priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable. @@ -98,6 +103,13 @@ struct ProcessListElement return !is_cancelled; } + bool updateProgressOut(const Progress & value) + { + progress_out.incrementPiecewiseAtomically(value); + return !is_cancelled; + } + + ProcessInfo getInfo() const { ProcessInfo res; @@ -105,9 +117,11 @@ struct ProcessListElement res.query = query; res.client_info = client_info; res.elapsed_seconds = watch.elapsedSeconds(); - res.rows = progress.rows; - res.bytes = progress.bytes; - res.total_rows = progress.total_rows; + res.read_rows = progress_in.rows; + res.read_bytes = progress_in.bytes; + res.total_rows = progress_in.total_rows; + res.written_rows = progress_out.rows; + res.written_bytes = progress_out.bytes; res.memory_usage = memory_tracker.get(); return res; diff --git a/dbms/include/DB/Interpreters/QueryLog.h b/dbms/include/DB/Interpreters/QueryLog.h index 3e37e199ad9..f04cfb0f614 100644 --- a/dbms/include/DB/Interpreters/QueryLog.h +++ b/dbms/include/DB/Interpreters/QueryLog.h @@ -7,14 +7,13 @@ namespace DB { -/** Позволяет логгировать информацию о выполнении запросов: - * - о начале выполнения запроса; - * - метрики производительности, после выполнения запроса; - * - об ошибках при выполнении запроса. +/** Allows to log information about queries execution: + * - info about start of query execution; + * - performance metrics (are set at the end of query execution); + * - info about errors of query execution. */ -/** Что логгировать. - */ +/// A struct which will be inserted as row into query_log table struct QueryLogElement { enum Type @@ -27,7 +26,7 @@ struct QueryLogElement Type type = QUERY_START; - /// В зависимости от типа, не все поля могут быть заполнены. + /// Depending on the type of query and type of stage, not all the fields may be filled. time_t event_time{}; time_t query_start_time{}; @@ -36,6 +35,11 @@ struct QueryLogElement UInt64 read_rows{}; UInt64 read_bytes{}; + UInt64 written_rows{}; + UInt64 written_bytes{}; + + /// NOTE: Not obvious metric. + /// It always approximately equal to read_rows or written_rows at the end of query execution. UInt64 result_rows{}; UInt64 result_bytes{}; diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index 28b09100da5..5a7882aeee3 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -34,20 +34,13 @@ struct MergeInfo std::atomic bytes_read_uncompressed{}; std::atomic bytes_written_uncompressed{}; - /// Updated only for Horizontal algorithm + /// In case of Vertical algorithm they are actual only for primary key columns std::atomic rows_read{}; std::atomic rows_written{}; /// Updated only for Vertical algorithm - /// mutually exclusive with rows_read and rows_written, updated either rows_written either columns_written std::atomic columns_written{}; - /// Updated in both cases - /// Number of rows for which primary key columns have been written - std::atomic rows_with_key_columns_read{}; - std::atomic rows_with_key_columns_written{}; - - MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name) : database{database}, table{table}, result_part_name{result_part_name} { @@ -66,9 +59,7 @@ struct MergeInfo bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)), rows_read(other.rows_read.load(std::memory_order_relaxed)), rows_written(other.rows_written.load(std::memory_order_relaxed)), - columns_written(other.columns_written.load(std::memory_order_relaxed)), - rows_with_key_columns_read(other.rows_with_key_columns_read.load(std::memory_order_relaxed)), - rows_with_key_columns_written(other.rows_with_key_columns_written.load(std::memory_order_relaxed)) + columns_written(other.columns_written.load(std::memory_order_relaxed)) { } }; diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index d3055c53ac9..51adad29139 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -6,7 +6,6 @@ #include #include - namespace DB { @@ -228,15 +227,15 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value) if (process_list_elem) { - if (!process_list_elem->update(value)) + if (!process_list_elem->updateProgressIn(value)) cancel(); /// Общее количество данных, обработанных или предполагаемых к обработке во всех листовых источниках, возможно, на удалённых серверах. - size_t rows_processed = process_list_elem->progress.rows; - size_t bytes_processed = process_list_elem->progress.bytes; + size_t rows_processed = process_list_elem->progress_in.rows; + size_t bytes_processed = process_list_elem->progress_in.bytes; - size_t total_rows_estimate = std::max(rows_processed, process_list_elem->progress.total_rows.load(std::memory_order_relaxed)); + size_t total_rows_estimate = std::max(rows_processed, process_list_elem->progress_in.total_rows.load(std::memory_order_relaxed)); /** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения. * NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList? @@ -270,7 +269,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value) throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); } - size_t total_rows = process_list_elem->progress.total_rows; + size_t total_rows = process_list_elem->progress_in.total_rows; if (limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0)) { @@ -283,7 +282,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value) + " rows/sec., minimum: " + toString(limits.min_execution_speed), ErrorCodes::TOO_SLOW); - size_t total_rows = process_list_elem->progress.total_rows; + size_t total_rows = process_list_elem->progress_in.total_rows; /// Если предсказанное время выполнения больше, чем max_execution_time. if (limits.max_execution_time != 0 && total_rows) diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 89b951d7b01..02e0064fafc 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -101,6 +102,10 @@ BlockIO InterpreterInsertQuery::execute() context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes); + auto out_wrapper = std::make_shared(out); + out_wrapper->setProcessListElement(context.getProcessListElement()); + out = std::move(out_wrapper); + BlockIO res; res.out_sample = getSampleBlock(); diff --git a/dbms/src/Interpreters/QueryLog.cpp b/dbms/src/Interpreters/QueryLog.cpp index a39b695ef5b..5a047036159 100644 --- a/dbms/src/Interpreters/QueryLog.cpp +++ b/dbms/src/Interpreters/QueryLog.cpp @@ -28,6 +28,9 @@ Block QueryLogElement::createBlock() {std::make_shared(), std::make_shared(), "read_rows"}, {std::make_shared(), std::make_shared(), "read_bytes"}, + {std::make_shared(), std::make_shared(), "written_rows"}, + {std::make_shared(), std::make_shared(), "written_bytes"}, + {std::make_shared(), std::make_shared(), "result_rows"}, {std::make_shared(), std::make_shared(), "result_bytes"}, @@ -102,6 +105,9 @@ void QueryLogElement::appendToBlock(Block & block) const block.unsafeGetByPosition(i++).column->insert(UInt64(read_rows)); block.unsafeGetByPosition(i++).column->insert(UInt64(read_bytes)); + block.unsafeGetByPosition(i++).column->insert(UInt64(written_rows)); + block.unsafeGetByPosition(i++).column->insert(UInt64(written_bytes)); + block.unsafeGetByPosition(i++).column->insert(UInt64(result_rows)); block.unsafeGetByPosition(i++).column->insert(UInt64(result_bytes)); diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 7f8f8b488d0..d29e04d6b44 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include @@ -191,13 +193,23 @@ static std::tuple executeQueryImpl( if (res.in) { - if (IProfilingBlockInputStream * stream = dynamic_cast(res.in.get())) + if (auto stream = dynamic_cast(res.in.get())) { stream->setProgressCallback(context.getProgressCallback()); stream->setProcessListElement(context.getProcessListElement()); } } + if (res.out) + { + if (auto stream = dynamic_cast(res.out.get())) + { + stream->setProcessListElement(context.getProcessListElement()); + } + } + if (!res.out || !dynamic_cast(res.out.get())) + LOG_DEBUG(&Logger::get("executeQuery"), "res.out " << res.out.get() << " is empty"); + /// Everything related to query log. { QueryLogElement elem; @@ -218,7 +230,7 @@ static std::tuple executeQueryImpl( context.getQueryLog().add(elem); /// Also make possible for caller to log successful query finish and exception during execution. - res.finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream) mutable + res.finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable { ProcessListElement * process_list_elem = context.getProcessListElement(); @@ -232,22 +244,35 @@ static std::tuple executeQueryImpl( elem.event_time = time(0); elem.query_duration_ms = elapsed_seconds * 1000; - elem.read_rows = process_list_elem->progress.rows; - elem.read_bytes = process_list_elem->progress.bytes; + elem.read_rows = process_list_elem->progress_in.rows; + elem.read_bytes = process_list_elem->progress_in.bytes; + + elem.written_rows = process_list_elem->progress_out.rows; + elem.written_bytes = process_list_elem->progress_out.bytes; auto memory_usage = process_list_elem->memory_tracker.getPeak(); elem.memory_usage = memory_usage > 0 ? memory_usage : 0; - if (stream) + if (stream_in) { - if (IProfilingBlockInputStream * profiling_stream = dynamic_cast(stream)) + if (auto profiling_stream = dynamic_cast(stream_in)) { const BlockStreamProfileInfo & info = profiling_stream->getProfileInfo(); + /// NOTE: INSERT SELECT query contains zero metrics elem.result_rows = info.rows; elem.result_bytes = info.bytes; } } + else if (stream_out) /// will be used only for ordinary INSERT queries + { + if (auto counting_stream = dynamic_cast(stream_out)) + { + /// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out. + elem.result_rows = counting_stream->getProgress().rows; + elem.result_bytes = counting_stream->getProgress().bytes; + } + } if (elem.read_rows != 0) { @@ -280,8 +305,8 @@ static std::tuple executeQueryImpl( elem.query_duration_ms = elapsed_seconds * 1000; - elem.read_rows = process_list_elem->progress.rows; - elem.read_bytes = process_list_elem->progress.bytes; + elem.read_rows = process_list_elem->progress_in.rows; + elem.read_bytes = process_list_elem->progress_in.bytes; auto memory_usage = process_list_elem->memory_tracker.getPeak(); elem.memory_usage = memory_usage > 0 ? memory_usage : 0; @@ -369,35 +394,8 @@ void executeQuery( { if (streams.out) { - const ASTInsertQuery * ast_insert_query = dynamic_cast(ast.get()); - - if (!ast_insert_query) - throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR); - - String format = ast_insert_query->format; - if (format.empty()) - format = "Values"; - - /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (istr) part of query. - - ConcatReadBuffer::ReadBuffers buffers; - ReadBuffer buf1(const_cast(ast_insert_query->data), ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0, 0); - - if (ast_insert_query->data) - buffers.push_back(&buf1); - buffers.push_back(&istr); - - /** NOTE Must not read from 'istr' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'. - * - because 'query.data' could refer to memory piece, used as buffer for 'istr'. - */ - - ConcatReadBuffer data_istr(buffers); - - BlockInputStreamPtr in{ - context.getInputFormat( - format, data_istr, streams.out_sample, context.getSettings().max_insert_block_size)}; - - copyData(*in, *streams.out); + InputStreamFromASTInsertQuery in(ast, istr, streams, context); + copyData(in, *streams.out); } if (streams.in) @@ -410,7 +408,7 @@ void executeQuery( BlockOutputStreamPtr out = context.getOutputFormat(format_name, ostr, streams.in_sample); - if (IProfilingBlockInputStream * stream = dynamic_cast(streams.in.get())) + if (auto stream = dynamic_cast(streams.in.get())) { /// NOTE Progress callback takes shared ownership of 'out'. stream->setProgressCallback([out] (const Progress & progress) { out->onProgress(progress); }); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 7b0b5710e1e..b79c693f04e 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -392,10 +392,9 @@ public: const ColumnSizeEstimator & column_sizes) : merge_entry(merge_entry_), merge_alg(merge_alg_) { - if (merge_alg == MergeAlgorithm::Horizontal) - average_elem_progress = 1.0 / num_total_rows; - else - average_elem_progress = column_sizes.keyColumnsProgress(1, num_total_rows); + average_elem_progress = (merge_alg == MergeAlgorithm::Horizontal) + ? 1.0 / num_total_rows + : column_sizes.keyColumnsProgress(1, num_total_rows); } MergeList::Entry & merge_entry; @@ -405,19 +404,11 @@ public: void operator() (const Progress & value) { ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); - merge_entry->bytes_read_uncompressed += value.bytes; - merge_entry->rows_with_key_columns_read += value.rows; + ProfileEvents::increment(ProfileEvents::MergedRows, value.rows); - if (merge_alg == MergeAlgorithm::Horizontal) - { - ProfileEvents::increment(ProfileEvents::MergedRows, value.rows); - merge_entry->rows_read += value.rows; - merge_entry->progress = average_elem_progress * merge_entry->rows_read; - } - else - { - merge_entry->progress = average_elem_progress * merge_entry->rows_with_key_columns_read; - } + merge_entry->bytes_read_uncompressed += value.bytes; + merge_entry->rows_read += value.rows; + merge_entry->progress = average_elem_progress * merge_entry->rows_read; }; }; @@ -597,9 +588,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart rows_written += block.rows(); to.write(block); - if (merge_alg == MergeAlgorithm::Horizontal) - merge_entry->rows_written = merged_stream->getProfileInfo().rows; - merge_entry->rows_with_key_columns_written = merged_stream->getProfileInfo().rows; + merge_entry->rows_written = merged_stream->getProfileInfo().rows; merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes; /// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates @@ -619,7 +608,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart /// Gather ordinary columns if (merge_alg == MergeAlgorithm::Vertical) { - size_t sum_input_rows_exact = merge_entry->rows_with_key_columns_read; + size_t sum_input_rows_exact = merge_entry->rows_read; merge_entry->columns_written = merging_column_names.size(); merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact); diff --git a/dbms/src/Storages/System/StorageSystemMerges.cpp b/dbms/src/Storages/System/StorageSystemMerges.cpp index 4bdce11acc0..0982c6f4ca6 100644 --- a/dbms/src/Storages/System/StorageSystemMerges.cpp +++ b/dbms/src/Storages/System/StorageSystemMerges.cpp @@ -25,9 +25,7 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name) { "rows_read", std::make_shared() }, { "bytes_written_uncompressed", std::make_shared() }, { "rows_written", std::make_shared() }, - { "columns_written", std::make_shared() }, - { "rows_with_key_columns_read", std::make_shared() }, - { "rows_with_key_columns_written", std::make_shared() } + { "columns_written", std::make_shared() } } { } @@ -62,8 +60,6 @@ BlockInputStreams StorageSystemMerges::read( ColumnWithTypeAndName col_bytes_written_uncompressed{std::make_shared(), std::make_shared(), "bytes_written_uncompressed"}; ColumnWithTypeAndName col_rows_written{std::make_shared(), std::make_shared(), "rows_written"}; ColumnWithTypeAndName col_columns_written{std::make_shared(), std::make_shared(), "columns_written"}; - ColumnWithTypeAndName col_rows_with_key_columns_read{std::make_shared(), std::make_shared(), "rows_with_key_columns_read"}; - ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared(), std::make_shared(), "rows_with_key_columns_written"}; for (const auto & merge : context.getMergeList().get()) { @@ -80,8 +76,6 @@ BlockInputStreams StorageSystemMerges::read( col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed)); col_rows_written.column->insert(merge.rows_written.load(std::memory_order_relaxed)); col_columns_written.column->insert(merge.columns_written.load(std::memory_order_relaxed)); - col_rows_with_key_columns_read.column->insert(merge.rows_with_key_columns_read.load(std::memory_order_relaxed)); - col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed)); } Block block{ @@ -97,9 +91,7 @@ BlockInputStreams StorageSystemMerges::read( col_rows_read, col_bytes_written_uncompressed, col_rows_written, - col_columns_written, - col_rows_with_key_columns_read, - col_rows_with_key_columns_written + col_columns_written }; return BlockInputStreams{1, std::make_shared(block)}; diff --git a/dbms/src/Storages/System/StorageSystemProcesses.cpp b/dbms/src/Storages/System/StorageSystemProcesses.cpp index 6fc9f4fd8af..d397e8bfd7d 100644 --- a/dbms/src/Storages/System/StorageSystemProcesses.cpp +++ b/dbms/src/Storages/System/StorageSystemProcesses.cpp @@ -41,11 +41,13 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_) { "quota_key", std::make_shared() }, { "elapsed", std::make_shared() }, - { "rows_read", std::make_shared() }, - { "bytes_read", std::make_shared() }, - { "total_rows_approx", std::make_shared() }, + { "read_rows", std::make_shared() }, + { "read_bytes", std::make_shared() }, + { "total_rows_approx", std::make_shared() }, + { "written_rows", std::make_shared() }, + { "written_bytes", std::make_shared() }, { "memory_usage", std::make_shared() }, - { "query", std::make_shared() }, + { "query", std::make_shared() } } { } @@ -95,9 +97,11 @@ BlockInputStreams StorageSystemProcesses::read( block.unsafeGetByPosition(i++).column->insert(process.client_info.http_user_agent); block.unsafeGetByPosition(i++).column->insert(process.client_info.quota_key); block.unsafeGetByPosition(i++).column->insert(process.elapsed_seconds); - block.unsafeGetByPosition(i++).column->insert(process.rows); - block.unsafeGetByPosition(i++).column->insert(process.bytes); + block.unsafeGetByPosition(i++).column->insert(process.read_rows); + block.unsafeGetByPosition(i++).column->insert(process.read_bytes); block.unsafeGetByPosition(i++).column->insert(process.total_rows); + block.unsafeGetByPosition(i++).column->insert(process.written_rows); + block.unsafeGetByPosition(i++).column->insert(process.written_bytes); block.unsafeGetByPosition(i++).column->insert(process.memory_usage); block.unsafeGetByPosition(i++).column->insert(process.query); } diff --git a/dbms/tests/clickhouse-test b/dbms/tests/clickhouse-test index 661ebaca64e..024961d2e59 100755 --- a/dbms/tests/clickhouse-test +++ b/dbms/tests/clickhouse-test @@ -30,17 +30,17 @@ MSG_SKIPPED = OP_SQUARE_BRACKET + colored(" SKIPPED ", "cyan", attrs=['bold']) + def main(args): SERVER_DIED = False - - + + def is_data_present(): proc = Popen(args.client, stdin=PIPE, stdout=PIPE, stderr=PIPE) (stdout, stderr) = proc.communicate("EXISTS TABLE test.hits") if proc.returncode != 0: raise CalledProcessError(proc.returncode, args.client, stderr) - + return stdout.startswith('1') - - + + def dump_report(destination, suite, test_case, report): if destination is not None: destination_file = os.path.join(destination, suite, test_case + ".xml") @@ -53,23 +53,23 @@ def main(args): report_suite.append(report) report_root.append(report_suite) report_file.write(et.tostring(report_root, encoding = "UTF-8", xml_declaration=True, pretty_print=True)) - - + + if args.zookeeper is None: try: check_call(['grep', '-q', ' 0: print(colored("\nHaving {0} errors!".format(failures_total), "red", attrs=["bold"])) sys.exit(1) else: print(colored("\nAll tests passed.", "green", attrs=["bold"])) sys.exit(0) - + if __name__ == '__main__': parser = ArgumentParser(description = 'ClickHouse functional tests') @@ -214,11 +214,11 @@ if __name__ == '__main__': parser.add_argument('-o', '--output', help = 'Output xUnit compliant test report directory') parser.add_argument('-t', '--timeout', type = int, default = 600, help = 'Timeout for each test case in seconds') parser.add_argument('test', nargs = '?', help = 'Optional test case name regex') - + group = parser.add_mutually_exclusive_group(required = False) group.add_argument('--zookeeper', action = 'store_true', default = None, dest = 'zookeeper', help = 'Run zookeeper related tests') group.add_argument('--no-zookeeper', action = 'store_false', default = None, dest = 'zookeeper', help = 'Do not run zookeeper related tests') - + args = parser.parse_args() - + main(args)