diff --git a/programs/copier/Internals.cpp b/programs/copier/Internals.cpp index c5e702cd1dc..84283777c8f 100644 --- a/programs/copier/Internals.cpp +++ b/programs/copier/Internals.cpp @@ -57,14 +57,6 @@ std::shared_ptr createASTStorageDistributed( } -BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream) -{ - return std::make_shared( - stream, - std::numeric_limits::max(), - std::numeric_limits::max()); -} - Block getBlockWithAllStreamData(QueryPipeline pipeline) { QueryPipelineBuilder builder; diff --git a/programs/copier/Internals.h b/programs/copier/Internals.h index 9e648060592..a9d8ca726fe 100644 --- a/programs/copier/Internals.h +++ b/programs/copier/Internals.h @@ -50,8 +50,6 @@ #include #include #include -#include -#include #include #include #include diff --git a/programs/library-bridge/Handlers.cpp b/programs/library-bridge/Handlers.cpp index 2f6dca5ee65..abc5118baad 100644 --- a/programs/library-bridge/Handlers.cpp +++ b/programs/library-bridge/Handlers.cpp @@ -1,7 +1,6 @@ #include "Handlers.h" #include "SharedLibraryHandlerFactory.h" -#include #include #include #include @@ -10,11 +9,13 @@ #include #include #include -#include #include +#include #include #include +#include #include +#include #include #include @@ -189,8 +190,10 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe ReadBufferFromString read_block_buf(params.get("null_values")); auto format = getContext()->getInputFormat(FORMAT, read_block_buf, *sample_block, DEFAULT_BLOCK_SIZE); - auto reader = std::make_shared(format); - auto sample_block_with_nulls = reader->read(); + QueryPipeline pipeline(Pipe(std::move(format))); + PullingPipelineExecutor executor(pipeline); + Block sample_block_with_nulls; + executor.pull(sample_block_with_nulls); LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure()); @@ -281,8 +284,10 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe auto & read_buf = request.getStream(); auto format = getContext()->getInputFormat(FORMAT, read_buf, *requested_sample_block, DEFAULT_BLOCK_SIZE); - auto reader = std::make_shared(format); - auto block = reader->read(); + QueryPipeline pipeline(std::move(format)); + PullingPipelineExecutor executor(pipeline); + Block block; + executor.pull(block); auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id); if (!library_handler) diff --git a/programs/library-bridge/SharedLibraryHandler.h b/programs/library-bridge/SharedLibraryHandler.h index f9d2fe43cb2..de1d098dc8d 100644 --- a/programs/library-bridge/SharedLibraryHandler.h +++ b/programs/library-bridge/SharedLibraryHandler.h @@ -2,7 +2,6 @@ #include #include -#include #include "LibraryUtils.h" diff --git a/programs/odbc-bridge/MainHandler.cpp b/programs/odbc-bridge/MainHandler.cpp index 51abe207095..6a2e967d179 100644 --- a/programs/odbc-bridge/MainHandler.cpp +++ b/programs/odbc-bridge/MainHandler.cpp @@ -4,7 +4,6 @@ #include "ODBCBlockInputStream.h" #include "ODBCBlockOutputStream.h" #include "getIdentifierQuote.h" -#include #include #include #include diff --git a/src/Client/IServerConnection.h b/src/Client/IServerConnection.h index 5584cf72bbf..42886c72182 100644 --- a/src/Client/IServerConnection.h +++ b/src/Client/IServerConnection.h @@ -6,7 +6,6 @@ #include #include -#include #include #include diff --git a/src/Core/ExternalTable.cpp b/src/Core/ExternalTable.cpp index 7619d60d84e..4dd8b0cf016 100644 --- a/src/Core/ExternalTable.cpp +++ b/src/Core/ExternalTable.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include diff --git a/src/DataStreams/BlockStreamProfileInfo.cpp b/src/DataStreams/BlockStreamProfileInfo.cpp index 09ad8a8e4ac..9a06d905223 100644 --- a/src/DataStreams/BlockStreamProfileInfo.cpp +++ b/src/DataStreams/BlockStreamProfileInfo.cpp @@ -1,5 +1,4 @@ #include -#include #include #include @@ -47,16 +46,12 @@ void BlockStreamProfileInfo::setFrom(const BlockStreamProfileInfo & rhs, bool sk size_t BlockStreamProfileInfo::getRowsBeforeLimit() const { - if (!calculated_rows_before_limit) - calculateRowsBeforeLimit(); return rows_before_limit; } bool BlockStreamProfileInfo::hasAppliedLimit() const { - if (!calculated_rows_before_limit) - calculateRowsBeforeLimit(); return applied_limit; } @@ -73,74 +68,4 @@ void BlockStreamProfileInfo::update(size_t num_rows, size_t num_bytes) bytes += num_bytes; } - -void BlockStreamProfileInfo::collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const -{ - if (!parent) - return; - - if (parent->getName() == name) - { - res.push_back(this); - return; - } - - parent->forEachChild([&] (IBlockInputStream & child) - { - child.getProfileInfo().collectInfosForStreamsWithName(name, res); - return false; - }); -} - - -void BlockStreamProfileInfo::calculateRowsBeforeLimit() const -{ - calculated_rows_before_limit = true; - - /// is there a Limit? - BlockStreamProfileInfos limits; - collectInfosForStreamsWithName("Limit", limits); - - if (!limits.empty()) - { - applied_limit = true; - - /** Take the number of lines read below `PartialSorting`, if any, or below `Limit`. - * This is necessary, because sorting can return only part of the rows. - */ - BlockStreamProfileInfos partial_sortings; - collectInfosForStreamsWithName("PartialSorting", partial_sortings); - - BlockStreamProfileInfos & limits_or_sortings = partial_sortings.empty() ? limits : partial_sortings; - - for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings) - { - info_limit_or_sort->parent->forEachChild([&] (IBlockInputStream & child) - { - rows_before_limit += child.getProfileInfo().rows; - return false; - }); - } - } - else - { - /// Then the data about `rows_before_limit` can be in `RemoteBlockInputStream` (come from a remote server). - BlockStreamProfileInfos remotes; - collectInfosForStreamsWithName("Remote", remotes); - collectInfosForStreamsWithName("TreeExecutor", remotes); - - if (remotes.empty()) - return; - - for (const auto & info : remotes) - { - if (info->applied_limit) - { - applied_limit = true; - rows_before_limit += info->rows_before_limit; - } - } - } -} - } diff --git a/src/DataStreams/BlockStreamProfileInfo.h b/src/DataStreams/BlockStreamProfileInfo.h index 688bdfc91fc..1707b941445 100644 --- a/src/DataStreams/BlockStreamProfileInfo.h +++ b/src/DataStreams/BlockStreamProfileInfo.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -16,9 +15,6 @@ class WriteBuffer; /// Information for profiling. See IBlockInputStream.h struct BlockStreamProfileInfo { - /// Info about stream object this profile info refers to. - IBlockInputStream * parent = nullptr; - bool started = false; Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time @@ -28,9 +24,6 @@ struct BlockStreamProfileInfo using BlockStreamProfileInfos = std::vector; - /// Collect BlockStreamProfileInfo for the nearest sources in the tree named `name`. Example; collect all info for PartialSorting streams. - void collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const; - /** Get the number of rows if there were no LIMIT. * If there is no LIMIT, 0 is returned. * If the query does not contain ORDER BY, the number can be underestimated - return the number of rows in blocks that were read before LIMIT reached. @@ -59,8 +52,6 @@ struct BlockStreamProfileInfo } private: - void calculateRowsBeforeLimit() const; - /// For these fields we make accessors, because they must be calculated beforehand. mutable bool applied_limit = false; /// Whether LIMIT was applied mutable size_t rows_before_limit = 0; diff --git a/src/DataStreams/IBlockInputStream.cpp b/src/DataStreams/IBlockInputStream.cpp deleted file mode 100644 index e57d6903673..00000000000 --- a/src/DataStreams/IBlockInputStream.cpp +++ /dev/null @@ -1,359 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include - -namespace ProfileEvents -{ - extern const Event ThrottlerSleepMicroseconds; - extern const Event SelectedRows; - extern const Event SelectedBytes; -} - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int QUERY_WAS_CANCELLED; - extern const int TOO_MANY_ROWS; - extern const int TOO_MANY_BYTES; - extern const int TOO_MANY_ROWS_OR_BYTES; - extern const int LOGICAL_ERROR; -} - - -/// It's safe to access children without mutex as long as these methods are called before first call to `read()` or `readPrefix()`. - - -Block IBlockInputStream::read() -{ - if (total_rows_approx) - { - progressImpl(Progress(0, 0, total_rows_approx)); - total_rows_approx = 0; - } - - if (!info.started) - { - info.total_stopwatch.start(); - info.started = true; - } - - Block res; - - if (isCancelledOrThrowIfKilled()) - return res; - - if (!checkTimeLimit()) - limit_exceeded_need_break = true; - - if (!limit_exceeded_need_break) - res = readImpl(); - - if (res) - { - info.update(res); - - if (enabled_extremes) - updateExtremes(res); - - if (limits.mode == LimitsMode::LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES)) - limit_exceeded_need_break = true; - - if (quota) - checkQuota(res); - } - else - { - /** If the stream is over, then we will ask all children to abort the execution. - * This makes sense when running a query with LIMIT - * - there is a situation when all the necessary data has already been read, - * but children sources are still working, - * herewith they can work in separate threads or even remotely. - */ - cancel(false); - } - - progress(Progress(res.rows(), res.bytes())); - -#ifndef NDEBUG - if (res) - { - Block header = getHeader(); - if (header) - assertBlocksHaveEqualStructure(res, header, getName()); - } -#endif - - return res; -} - - -void IBlockInputStream::readPrefix() -{ -#ifndef NDEBUG - if (!read_prefix_is_called) - read_prefix_is_called = true; - else - throw Exception("readPrefix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR); -#endif - - readPrefixImpl(); - - forEachChild([&] (IBlockInputStream & child) - { - child.readPrefix(); - return false; - }); -} - - -void IBlockInputStream::readSuffix() -{ -#ifndef NDEBUG - if (!read_suffix_is_called) - read_suffix_is_called = true; - else - throw Exception("readSuffix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR); -#endif - - forEachChild([&] (IBlockInputStream & child) - { - child.readSuffix(); - return false; - }); - - readSuffixImpl(); -} - - -void IBlockInputStream::updateExtremes(Block & block) -{ - size_t num_columns = block.columns(); - - if (!extremes) - { - MutableColumns extremes_columns(num_columns); - - for (size_t i = 0; i < num_columns; ++i) - { - const ColumnPtr & src = block.safeGetByPosition(i).column; - - if (isColumnConst(*src)) - { - /// Equal min and max. - extremes_columns[i] = src->cloneResized(2); - } - else - { - Field min_value; - Field max_value; - - src->getExtremes(min_value, max_value); - - extremes_columns[i] = src->cloneEmpty(); - - extremes_columns[i]->insert(min_value); - extremes_columns[i]->insert(max_value); - } - } - - extremes = block.cloneWithColumns(std::move(extremes_columns)); - } - else - { - for (size_t i = 0; i < num_columns; ++i) - { - ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column; - - if (isColumnConst(*old_extremes)) - continue; - - Field min_value = (*old_extremes)[0]; - Field max_value = (*old_extremes)[1]; - - Field cur_min_value; - Field cur_max_value; - - block.safeGetByPosition(i).column->getExtremes(cur_min_value, cur_max_value); - - if (cur_min_value < min_value) - min_value = cur_min_value; - if (cur_max_value > max_value) - max_value = cur_max_value; - - MutableColumnPtr new_extremes = old_extremes->cloneEmpty(); - - new_extremes->insert(min_value); - new_extremes->insert(max_value); - - old_extremes = std::move(new_extremes); - } - } -} - - -bool IBlockInputStream::checkTimeLimit() const -{ - return limits.speed_limits.checkTimeLimit(info.total_stopwatch, limits.timeout_overflow_mode); -} - - -void IBlockInputStream::checkQuota(Block & block) -{ - switch (limits.mode) - { - case LimitsMode::LIMITS_TOTAL: - /// Checked in `progress` method. - break; - - case LimitsMode::LIMITS_CURRENT: - { - UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds(); - quota->used({Quota::RESULT_ROWS, block.rows()}, {Quota::RESULT_BYTES, block.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed}); - prev_elapsed = total_elapsed; - break; - } - } -} - - -void IBlockInputStream::progressImpl(const Progress & value) -{ - if (progress_callback) - progress_callback(value); - - if (process_list_elem) - { - if (!process_list_elem->updateProgressIn(value)) - cancel(/* kill */ true); - - /// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers. - - ProgressValues progress = process_list_elem->getProgressIn(); - size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read); - - /** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read. - * NOTE: Maybe it makes sense to have them checked directly in ProcessList? - */ - if (limits.mode == LimitsMode::LIMITS_TOTAL) - { - if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read", - ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES)) - cancel(false); - } - - size_t total_rows = progress.total_rows_to_read; - - constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds - UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds(); - - if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds) - { - CurrentThread::updatePerformanceCounters(); - last_profile_events_update_time = total_elapsed_microseconds; - } - - limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds); - - if (quota && limits.mode == LimitsMode::LIMITS_TOTAL) - quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes}); - } - - ProfileEvents::increment(ProfileEvents::SelectedRows, value.read_rows); - ProfileEvents::increment(ProfileEvents::SelectedBytes, value.read_bytes); -} - - -void IBlockInputStream::cancel(bool kill) -{ - if (kill) - is_killed = true; - - bool old_val = false; - if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) - return; - - forEachChild([&] (IBlockInputStream & child) - { - child.cancel(kill); - return false; - }); -} - - -bool IBlockInputStream::isCancelled() const -{ - return is_cancelled; -} - -bool IBlockInputStream::isCancelledOrThrowIfKilled() const -{ - if (!is_cancelled) - return false; - if (is_killed) - throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED); - return true; -} - - -void IBlockInputStream::setProgressCallback(const ProgressCallback & callback) -{ - progress_callback = callback; - - forEachChild([&] (IBlockInputStream & child) - { - child.setProgressCallback(callback); - return false; - }); -} - - -void IBlockInputStream::setProcessListElement(QueryStatus * elem) -{ - process_list_elem = elem; - - forEachChild([&] (IBlockInputStream & child) - { - child.setProcessListElement(elem); - return false; - }); -} - - -Block IBlockInputStream::getTotals() -{ - if (totals) - return totals; - - Block res; - forEachChild([&] (IBlockInputStream & child) - { - res = child.getTotals(); - return bool(res); - }); - return res; -} - - -Block IBlockInputStream::getExtremes() -{ - if (extremes) - return extremes; - - Block res; - forEachChild([&] (IBlockInputStream & child) - { - res = child.getExtremes(); - return bool(res); - }); - return res; -} - -} diff --git a/src/DataStreams/IBlockInputStream.h b/src/DataStreams/IBlockInputStream.h deleted file mode 100644 index 0e77ba81779..00000000000 --- a/src/DataStreams/IBlockInputStream.h +++ /dev/null @@ -1,271 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ -} - -class ProcessListElement; -class EnabledQuota; -class QueryStatus; - - -/** The stream interface for reading data by blocks from the database. - * Relational operations are supposed to be done also as implementations of this interface. - * Watches out at how the source of the blocks works. - * Lets you get information for profiling: rows per second, blocks per second, megabytes per second, etc. - * Allows you to stop reading data (in nested sources). - */ -class IBlockInputStream : public TypePromotion -{ - friend struct BlockStreamProfileInfo; - -public: - IBlockInputStream() { info.parent = this; } - virtual ~IBlockInputStream() = default; - - IBlockInputStream(const IBlockInputStream &) = delete; - IBlockInputStream & operator=(const IBlockInputStream &) = delete; - - /// To output the data stream transformation tree (query execution plan). - virtual String getName() const = 0; - - /** Get data structure of the stream in a form of "header" block (it is also called "sample block"). - * Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values. - * It is guaranteed that method "read" returns blocks of exactly that structure. - */ - virtual Block getHeader() const = 0; - - virtual const BlockMissingValues & getMissingValues() const - { - static const BlockMissingValues none; - return none; - } - - /** Read next block. - * If there are no more blocks, return an empty block (for which operator `bool` returns false). - * NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously. - * This also applies for readPrefix, readSuffix. - */ - Block read(); - - /** Read something before starting all data or after the end of all data. - * In the `readSuffix` function, you can implement a finalization that can lead to an exception. - * readPrefix() must be called before the first call to read(). - * readSuffix() should be called after read() returns an empty block, or after a call to cancel(), but not during read() execution. - */ - - /** The default implementation calls readPrefixImpl() on itself, and then readPrefix() recursively for all children. - * There are cases when you do not want `readPrefix` of children to be called synchronously, in this function, - * but you want them to be called, for example, in separate threads (for parallel initialization of children). - * Then overload `readPrefix` function. - */ - virtual void readPrefix(); - - /** The default implementation calls recursively readSuffix() on all children, and then readSuffixImpl() on itself. - * If this stream calls read() in children in a separate thread, this behavior is usually incorrect: - * readSuffix() of the child can not be called at the moment when the same child's read() is executed in another thread. - * In this case, you need to override this method so that readSuffix() in children is called, for example, after connecting streams. - */ - virtual void readSuffix(); - - /// Do not allow to change the table while the blocks stream and its children are alive. - void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); } - - /// Get information about execution speed. - const BlockStreamProfileInfo & getProfileInfo() const { return info; } - - /** Get "total" values. - * The default implementation takes them from itself or from the first child source in which they are. - * The overridden method can perform some calculations. For example, apply an expression to the `totals` of the child source. - * There can be no total values - then an empty block is returned. - * - * Call this method only after all the data has been retrieved with `read`, - * otherwise there will be problems if any data at the same time is computed in another thread. - */ - virtual Block getTotals(); - - /// The same for minimums and maximums. - virtual Block getExtremes(); - - - /** Set the execution progress bar callback. - * The callback is passed to all child sources. - * By default, it is called for leaf sources, after each block. - * (But this can be overridden in the progress() method) - * The function takes the number of rows in the last block, the number of bytes in the last block. - * Note that the callback can be called from different threads. - */ - virtual void setProgressCallback(const ProgressCallback & callback); - - - /** In this method: - * - the progress callback is called; - * - the status of the query execution in ProcessList is updated; - * - checks restrictions and quotas that should be checked not within the same source, - * but over the total amount of resources spent in all sources at once (information in the ProcessList). - */ - virtual void progress(const Progress & value) - { - /// The data for progress is taken from leaf sources. - if (children.empty()) - progressImpl(value); - } - - void progressImpl(const Progress & value); - - - /** Set the pointer to the process list item. - * It is passed to all child sources. - * General information about the resources spent on the request will be written into it. - * Based on this information, the quota and some restrictions will be checked. - * This information will also be available in the SHOW PROCESSLIST request. - */ - virtual void setProcessListElement(QueryStatus * elem); - - /** Set the approximate total number of rows to read. - */ - void addTotalRowsApprox(size_t value) { total_rows_approx += value; } - - - /** Ask to abort the receipt of data as soon as possible. - * By default - just sets the flag is_cancelled and asks that all children be interrupted. - * This function can be called several times, including simultaneously from different threads. - * Have two modes: - * with kill = false only is_cancelled is set - streams will stop silently with returning some processed data. - * with kill = true also is_killed set - queries will stop with exception. - */ - virtual void cancel(bool kill); - - bool isCancelled() const; - bool isCancelledOrThrowIfKilled() const; - - /** Set limitations that checked on each block. */ - virtual void setLimits(const StreamLocalLimits & limits_) - { - limits = limits_; - } - - const StreamLocalLimits & getLimits() const - { - return limits; - } - - /** Set the quota. If you set a quota on the amount of raw data, - * then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits. - */ - virtual void setQuota(const std::shared_ptr & new_quota) - { - quota = new_quota; - } - - /// Enable calculation of minimums and maximums by the result columns. - void enableExtremes() { enabled_extremes = true; } - -protected: - /// Order is important: `table_locks` must be destroyed after `children` so that tables from - /// which child streams read are protected by the locks during the lifetime of the child streams. - std::vector table_locks; - - BlockInputStreams children; - std::shared_mutex children_mutex; - - BlockStreamProfileInfo info; - std::atomic is_cancelled{false}; - std::atomic is_killed{false}; - ProgressCallback progress_callback; - QueryStatus * process_list_elem = nullptr; - /// According to total_stopwatch in microseconds - UInt64 last_profile_events_update_time = 0; - - /// Additional information that can be generated during the work process. - - /// Total values during aggregation. - Block totals; - /// Minimums and maximums. The first row of the block - minimums, the second - the maximums. - Block extremes; - - - void addChild(const BlockInputStreamPtr & child) - { - std::unique_lock lock(children_mutex); - children.push_back(child); - } - - /** Check limits. - * But only those that can be checked within each separate stream. - */ - bool checkTimeLimit() const; - -#ifndef NDEBUG - bool read_prefix_is_called = false; - bool read_suffix_is_called = false; -#endif - -private: - bool enabled_extremes = false; - - /// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out. - bool limit_exceeded_need_break = false; - - /// Limitations and quotas. - - StreamLocalLimits limits; - - std::shared_ptr quota; /// If nullptr - the quota is not used. - UInt64 prev_elapsed = 0; - - /// The approximate total number of rows to read. For progress bar. - size_t total_rows_approx = 0; - - /// Derived classes must implement this function. - virtual Block readImpl() = 0; - - /// Here you can do a preliminary initialization. - virtual void readPrefixImpl() {} - - /// Here you need to do a finalization, which can lead to an exception. - virtual void readSuffixImpl() {} - - void updateExtremes(Block & block); - - /** Check quotas. - * But only those that can be checked within each separate stream. - */ - void checkQuota(Block & block); - - size_t checkDepthImpl(size_t max_depth, size_t level) const; - - template - void forEachChild(F && f) - { - /// NOTE: Acquire a read lock, therefore f() should be thread safe - std::shared_lock lock(children_mutex); - - // Reduce lock scope and avoid recursive locking since that is undefined for shared_mutex. - const auto children_copy = children; - lock.unlock(); - - for (auto & child : children_copy) - if (f(*child)) - return; - } - -}; - -} diff --git a/src/DataStreams/IBlockOutputStream.h b/src/DataStreams/IBlockOutputStream.h deleted file mode 100644 index 65ebd90769d..00000000000 --- a/src/DataStreams/IBlockOutputStream.h +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once - -#include -#include -#include - -#include - -#include -#include -#include - - -namespace DB -{ - -struct Progress; - -/** Interface of stream for writing data (into table, filesystem, network, terminal, etc.) - */ -class IBlockOutputStream : private boost::noncopyable -{ -public: - IBlockOutputStream() = default; - - /** Get data structure of the stream in a form of "header" block (it is also called "sample block"). - * Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values. - * You must pass blocks of exactly this structure to the 'write' method. - */ - virtual Block getHeader() const = 0; - - /** Write block. - */ - virtual void write(const Block & block) = 0; - - /** Write or do something before all data or after all data. - */ - virtual void writePrefix() {} - virtual void writeSuffix() {} - - /** Flush output buffers if any. - */ - virtual void flush() {} - - /** Methods to set additional information for output in formats, that support it. - */ - virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {} - virtual void setTotals(const Block & /*totals*/) {} - virtual void setExtremes(const Block & /*extremes*/) {} - - /** Notify about progress. Method could be called from different threads. - * Passed value are delta, that must be summarized. - */ - virtual void onProgress(const Progress & /*progress*/) {} - - /** Content-Type to set when sending HTTP response. - */ - virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; } - - virtual ~IBlockOutputStream() = default; - - /** Don't let to alter table while instance of stream is alive. - */ - void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); } - -private: - std::vector table_locks; -}; - -} diff --git a/src/DataStreams/IBlockStream_fwd.h b/src/DataStreams/IBlockStream_fwd.h deleted file mode 100644 index d74a9528ed9..00000000000 --- a/src/DataStreams/IBlockStream_fwd.h +++ /dev/null @@ -1,17 +0,0 @@ -#pragma once - -#include -#include - -namespace DB -{ - -class IBlockInputStream; -class IBlockOutputStream; - -using BlockInputStreamPtr = std::shared_ptr; -using BlockInputStreams = std::vector; -using BlockOutputStreamPtr = std::shared_ptr; -using BlockOutputStreams = std::vector; - -} diff --git a/src/DataStreams/InternalTextLogs.h b/src/DataStreams/InternalTextLogs.h index 1312c1d327c..a8b119b0f69 100644 --- a/src/DataStreams/InternalTextLogs.h +++ b/src/DataStreams/InternalTextLogs.h @@ -1,6 +1,6 @@ #pragma once -#include #include +#include namespace DB diff --git a/src/DataStreams/NativeReader.h b/src/DataStreams/NativeReader.h index cfd58bde2cc..95b03c71764 100644 --- a/src/DataStreams/NativeReader.h +++ b/src/DataStreams/NativeReader.h @@ -1,8 +1,8 @@ #pragma once -#include #include #include +#include namespace DB { diff --git a/src/DataStreams/TemporaryFileStream.cpp b/src/DataStreams/TemporaryFileStream.cpp index 826cf5508d8..4b7c9d50fe7 100644 --- a/src/DataStreams/TemporaryFileStream.cpp +++ b/src/DataStreams/TemporaryFileStream.cpp @@ -1,8 +1,6 @@ #include -#include #include #include -#include #include #include #include diff --git a/src/DataStreams/TemporaryFileStream.h b/src/DataStreams/TemporaryFileStream.h index c0c13605928..e288b5b30fa 100644 --- a/src/DataStreams/TemporaryFileStream.h +++ b/src/DataStreams/TemporaryFileStream.h @@ -4,7 +4,6 @@ #include #include #include -#include #include namespace DB diff --git a/src/DataStreams/copyData.cpp b/src/DataStreams/copyData.cpp deleted file mode 100644 index a26052778a8..00000000000 --- a/src/DataStreams/copyData.cpp +++ /dev/null @@ -1,86 +0,0 @@ -#include -#include -#include -#include - - -namespace DB -{ - -namespace -{ - -bool isAtomicSet(std::atomic * val) -{ - return ((val != nullptr) && val->load(std::memory_order_seq_cst)); -} - -} - -template -void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCallback && is_cancelled, TProgressCallback && progress) -{ - from.readPrefix(); - to.writePrefix(); - - while (Block block = from.read()) - { - if (is_cancelled()) - break; - - to.write(block); - progress(block); - } - - if (is_cancelled()) - return; - - /// For outputting additional information in some formats. - if (from.getProfileInfo().hasAppliedLimit()) - to.setRowsBeforeLimit(from.getProfileInfo().getRowsBeforeLimit()); - - to.setTotals(from.getTotals()); - to.setExtremes(from.getExtremes()); - - if (is_cancelled()) - return; - - from.readSuffix(); - to.writeSuffix(); -} - -void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & progress, - std::atomic * is_cancelled) -{ - auto is_cancelled_pred = [is_cancelled] () - { - return isAtomicSet(is_cancelled); - }; - - copyDataImpl(from, to, is_cancelled_pred, progress); -} - -inline void doNothing(const Block &) {} - -void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic * is_cancelled) -{ - auto is_cancelled_pred = [is_cancelled] () - { - return isAtomicSet(is_cancelled); - }; - - copyDataImpl(from, to, is_cancelled_pred, doNothing); -} - -void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled) -{ - copyDataImpl(from, to, is_cancelled, doNothing); -} - -void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled, - const std::function & progress) -{ - copyDataImpl(from, to, is_cancelled, progress); -} - -} diff --git a/src/DataStreams/copyData.h b/src/DataStreams/copyData.h deleted file mode 100644 index 3dc90aed37d..00000000000 --- a/src/DataStreams/copyData.h +++ /dev/null @@ -1,27 +0,0 @@ -#pragma once - -#include - -#include -#include - - -namespace DB -{ - -class Block; - -/** Copies data from the InputStream into the OutputStream - * (for example, from the database to the console, etc.) - */ -void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic * is_cancelled = nullptr); - -void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & progress, - std::atomic * is_cancelled = nullptr); - -void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled); - -void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled, - const std::function & progress); - -} diff --git a/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp index f4a5b6b5e4e..9ec8a9523c6 100644 --- a/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp @@ -14,7 +14,6 @@ # include # include # include -# include # include # include # include diff --git a/src/Dictionaries/DictionarySourceHelpers.cpp b/src/Dictionaries/DictionarySourceHelpers.cpp index cf003dceb8e..cd87cf831a2 100644 --- a/src/Dictionaries/DictionarySourceHelpers.cpp +++ b/src/Dictionaries/DictionarySourceHelpers.cpp @@ -1,7 +1,6 @@ #include "DictionarySourceHelpers.h" #include #include -#include #include #include #include "DictionaryStructure.h" diff --git a/src/Dictionaries/HTTPDictionarySource.cpp b/src/Dictionaries/HTTPDictionarySource.cpp index 6cb4d52744a..844a6357e29 100644 --- a/src/Dictionaries/HTTPDictionarySource.cpp +++ b/src/Dictionaries/HTTPDictionarySource.cpp @@ -1,5 +1,4 @@ #include "HTTPDictionarySource.h" -#include #include #include #include diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index ed9f9d52b94..34574ca13f8 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index d816ef1d016..f20cec56943 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 85ce83868c6..975075eba96 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -19,7 +19,6 @@ #include #include -#include #include #include diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 1e19c18de43..22ae459a662 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 566ee60a3e6..9d770aee159 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -35,7 +35,6 @@ #include #include -#include #include diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index c785b085a57..b6bb3c5fad5 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include diff --git a/src/Interpreters/InterpreterExistsQuery.cpp b/src/Interpreters/InterpreterExistsQuery.cpp index 6eb188bce9f..6ffeef5cc7d 100644 --- a/src/Interpreters/InterpreterExistsQuery.cpp +++ b/src/Interpreters/InterpreterExistsQuery.cpp @@ -2,7 +2,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 8a31917caef..6a1a8652b23 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 0d6fe34c0c2..5f44603a420 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index 21e15bc74bb..cf24d14b737 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -3,7 +3,6 @@ #include #include -#include #include #include #include @@ -52,13 +51,6 @@ public: const SelectQueryOptions &, const Names & required_result_column_names_ = Names{}); - /// Read data not from the table specified in the query, but from the prepared source `input`. - InterpreterSelectQuery( - const ASTPtr & query_ptr_, - ContextPtr context_, - const BlockInputStreamPtr & input_, - const SelectQueryOptions & = {}); - /// Read data not from the table specified in the query, but from the prepared pipe `input`. InterpreterSelectQuery( const ASTPtr & query_ptr_, diff --git a/src/Interpreters/InterpreterShowCreateQuery.cpp b/src/Interpreters/InterpreterShowCreateQuery.cpp index adf1aae3ff3..c191a73bc71 100644 --- a/src/Interpreters/InterpreterShowCreateQuery.cpp +++ b/src/Interpreters/InterpreterShowCreateQuery.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Interpreters/InterpreterWatchQuery.cpp b/src/Interpreters/InterpreterWatchQuery.cpp index bc0aeda56bd..78c4eca5ca6 100644 --- a/src/Interpreters/InterpreterWatchQuery.cpp +++ b/src/Interpreters/InterpreterWatchQuery.cpp @@ -15,7 +15,6 @@ limitations under the License. */ #include #include #include -#include #include diff --git a/src/Interpreters/InterpreterWatchQuery.h b/src/Interpreters/InterpreterWatchQuery.h index 2bc7236582a..e43ed88af2f 100644 --- a/src/Interpreters/InterpreterWatchQuery.h +++ b/src/Interpreters/InterpreterWatchQuery.h @@ -38,8 +38,6 @@ private: /// Table from where to read data, if not subquery. StoragePtr storage; - /// Streams of read data - BlockInputStreams streams; }; } diff --git a/src/Interpreters/JoinSwitcher.h b/src/Interpreters/JoinSwitcher.h index aaa7441b8a4..30115710e22 100644 --- a/src/Interpreters/JoinSwitcher.h +++ b/src/Interpreters/JoinSwitcher.h @@ -5,7 +5,6 @@ #include #include #include -#include namespace DB diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index f8402cf0287..fb9f2e25c07 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include diff --git a/src/Interpreters/SortedBlocksWriter.h b/src/Interpreters/SortedBlocksWriter.h index c65511e943e..94bebce88f7 100644 --- a/src/Interpreters/SortedBlocksWriter.h +++ b/src/Interpreters/SortedBlocksWriter.h @@ -8,7 +8,6 @@ #include #include #include -#include namespace DB diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 22cd958f4f8..02dcd95ab41 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Interpreters/executeDDLQueryOnCluster.h b/src/Interpreters/executeDDLQueryOnCluster.h index f430c2364b2..0ad40dd3332 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.h +++ b/src/Interpreters/executeDDLQueryOnCluster.h @@ -4,7 +4,6 @@ #include #include #include -#include namespace zkutil diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 3f5b386d16f..0a1130c721b 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -10,8 +10,6 @@ #include #include -#include -#include #include #include diff --git a/src/Processors/Formats/OutputStreamToOutputFormat.cpp b/src/Processors/Formats/OutputStreamToOutputFormat.cpp deleted file mode 100644 index 5d4e7832327..00000000000 --- a/src/Processors/Formats/OutputStreamToOutputFormat.cpp +++ /dev/null @@ -1,43 +0,0 @@ -#include -#include - -namespace DB -{ - -Block OutputStreamToOutputFormat::getHeader() const -{ - return output_format->getPort(IOutputFormat::PortKind::Main).getHeader(); -} - -void OutputStreamToOutputFormat::write(const Block & block) -{ - output_format->write(block); -} - -void OutputStreamToOutputFormat::writePrefix() { output_format->doWritePrefix(); } -void OutputStreamToOutputFormat::writeSuffix() { output_format->doWriteSuffix(); } - -void OutputStreamToOutputFormat::flush() { output_format->flush(); } - -void OutputStreamToOutputFormat::setRowsBeforeLimit(size_t rows_before_limit) -{ - output_format->setRowsBeforeLimit(rows_before_limit); -} - -void OutputStreamToOutputFormat::setTotals(const Block & totals) -{ - if (totals) - output_format->setTotals(totals); -} - -void OutputStreamToOutputFormat::setExtremes(const Block & extremes) -{ - if (extremes) - output_format->setExtremes(extremes); -} - -void OutputStreamToOutputFormat::onProgress(const Progress & progress) { output_format->onProgress(progress); } - -std::string OutputStreamToOutputFormat::getContentType() const { return output_format->getContentType(); } - -} diff --git a/src/Processors/Formats/OutputStreamToOutputFormat.h b/src/Processors/Formats/OutputStreamToOutputFormat.h deleted file mode 100644 index a85de12b49d..00000000000 --- a/src/Processors/Formats/OutputStreamToOutputFormat.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once -#include - -namespace DB -{ - - -class IOutputFormat; - -using OutputFormatPtr = std::shared_ptr; - -/// Wrapper. Implements IBlockOutputStream interface using IOutputFormat object. -class OutputStreamToOutputFormat : public IBlockOutputStream -{ -public: - explicit OutputStreamToOutputFormat(OutputFormatPtr output_format_) : output_format(std::move(output_format_)) {} - - Block getHeader() const override; - - void write(const Block & block) override; - - void writePrefix() override; - void writeSuffix() override; - - void flush() override; - - void setRowsBeforeLimit(size_t rows_before_limit) override; - void setTotals(const Block & totals) override; - void setExtremes(const Block & extremes) override; - - void onProgress(const Progress & progress) override; - - std::string getContentType() const override; - -private: - OutputFormatPtr output_format; -}; - -} diff --git a/src/Processors/QueryPipelineBuilder.h b/src/Processors/QueryPipelineBuilder.h index 78ae5dd41be..7e0ddbc0285 100644 --- a/src/Processors/QueryPipelineBuilder.h +++ b/src/Processors/QueryPipelineBuilder.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include diff --git a/src/Processors/Transforms/CreatingSetsTransform.cpp b/src/Processors/Transforms/CreatingSetsTransform.cpp index 6776caae9bf..e6ae620e69b 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.cpp +++ b/src/Processors/Transforms/CreatingSetsTransform.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include diff --git a/src/Processors/Transforms/CreatingSetsTransform.h b/src/Processors/Transforms/CreatingSetsTransform.h index eca12c33f54..a5a67e99afc 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.h +++ b/src/Processors/Transforms/CreatingSetsTransform.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include diff --git a/src/Processors/Transforms/buildPushingToViewsChain.h b/src/Processors/Transforms/buildPushingToViewsChain.h index 75d0528ff7b..6956dedbc41 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.h +++ b/src/Processors/Transforms/buildPushingToViewsChain.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include diff --git a/src/Server/MySQLHandler.cpp b/src/Server/MySQLHandler.cpp index 3720362775c..8f4f04e56c5 100644 --- a/src/Server/MySQLHandler.cpp +++ b/src/Server/MySQLHandler.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index d001b12ee66..e89d82cfcc8 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -13,7 +13,6 @@ #include #include #include -#include #include "IServer.h" diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 6ce17552ba1..4ed3a43d2ed 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -2,7 +2,6 @@ #include #include -#include #include #include #include diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 690c9cbd4d0..39688060b0a 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -1,7 +1,6 @@ #include #include -#include #include #include #include diff --git a/src/Storages/LiveView/StorageLiveView.cpp b/src/Storages/LiveView/StorageLiveView.cpp index 4641a1631f2..be3dd9ae6c9 100644 --- a/src/Storages/LiveView/StorageLiveView.cpp +++ b/src/Storages/LiveView/StorageLiveView.cpp @@ -21,7 +21,6 @@ limitations under the License. */ #include #include #include -#include #include #include #include diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index 5e12d5da678..d0d3f283478 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index f4adee8c259..5dbca837f31 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index d952950e461..e64ba9edec0 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -6,7 +6,6 @@ #include #include #include -#include #include #include diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 75308f872dc..e3ca902b1bd 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -2,9 +2,9 @@ #include #include #include -#include #include #include +#include #include diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp index 9e138e9882a..947c0bbe932 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp @@ -3,7 +3,6 @@ #include "StorageMaterializedPostgreSQL.h" #include #include -#include #include #include #include diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 873a4b4860c..3796bd8ba57 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 2b4f5e4a276..cf9b557de25 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index 59f250d67b8..3e8955ad864 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -2,7 +2,6 @@ #include #include -#include #include #include diff --git a/src/Storages/StorageExecutable.cpp b/src/Storages/StorageExecutable.cpp index d6e242d1a97..0f47f654428 100644 --- a/src/Storages/StorageExecutable.cpp +++ b/src/Storages/StorageExecutable.cpp @@ -11,7 +11,6 @@ #include #include -#include #include #include #include diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 9aa5689aa66..4ae55272db6 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -16,7 +16,6 @@ #include #include -#include #include #include @@ -479,8 +478,6 @@ Pipe StorageFile::read( size_t max_block_size, unsigned num_streams) { - BlockInputStreams blocks_input; - if (use_table_fd) /// need to call ctr BlockInputStream paths = {""}; /// when use fd, paths are empty else diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index 0fd94bac95a..f0c4509f188 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -16,8 +16,6 @@ #include -#include - #include #include diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 5a9e8fc2461..904d1a7f89c 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include #include diff --git a/src/Storages/StorageMemory.h b/src/Storages/StorageMemory.h index 846fd4af5fd..063802faf1a 100644 --- a/src/Storages/StorageMemory.h +++ b/src/Storages/StorageMemory.h @@ -8,7 +8,6 @@ #include #include -#include #include diff --git a/src/Storages/StorageMongoDB.cpp b/src/Storages/StorageMongoDB.cpp index eeb5b107b54..15430f60285 100644 --- a/src/Storages/StorageMongoDB.cpp +++ b/src/Storages/StorageMongoDB.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/StorageMySQL.cpp b/src/Storages/StorageMySQL.cpp index 4264be9dbc2..3bdf3218b2e 100644 --- a/src/Storages/StorageMySQL.cpp +++ b/src/Storages/StorageMySQL.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/StoragePostgreSQL.h b/src/Storages/StoragePostgreSQL.h index b4bb5400930..10a60bf9b21 100644 --- a/src/Storages/StoragePostgreSQL.h +++ b/src/Storages/StoragePostgreSQL.h @@ -8,7 +8,6 @@ #include #include #include -#include #include #include diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 416d37cd351..20e64255684 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -60,7 +60,6 @@ #include #include -#include #include diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 709c9dc4a63..d95a9465bd6 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -30,7 +30,6 @@ #include -#include #include #include #include diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index e6d41a53bfc..0e7faad194e 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/StorageSet.cpp b/src/Storages/StorageSet.cpp index 58c56f1401f..2547af1b0ad 100644 --- a/src/Storages/StorageSet.cpp +++ b/src/Storages/StorageSet.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index 2dc2577f245..0cd07afc26c 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 79f1d568057..9397986fadd 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -15,7 +15,6 @@ #include #include -#include #include #include diff --git a/src/Storages/StorageXDBC.cpp b/src/Storages/StorageXDBC.cpp index a1254e2aaeb..a0924896437 100644 --- a/src/Storages/StorageXDBC.cpp +++ b/src/Storages/StorageXDBC.cpp @@ -1,6 +1,5 @@ #include "StorageXDBC.h" -#include #include #include #include diff --git a/src/Storages/System/StorageSystemZooKeeper.cpp b/src/Storages/System/StorageSystemZooKeeper.cpp index 9aedee66b5f..f2b2102c7ff 100644 --- a/src/Storages/System/StorageSystemZooKeeper.cpp +++ b/src/Storages/System/StorageSystemZooKeeper.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include diff --git a/src/Storages/tests/gtest_storage_log.cpp b/src/Storages/tests/gtest_storage_log.cpp index e7ecfc7c4f0..5b891c43aae 100644 --- a/src/Storages/tests/gtest_storage_log.cpp +++ b/src/Storages/tests/gtest_storage_log.cpp @@ -1,7 +1,6 @@ #include #include -#include #include #include #include diff --git a/src/TableFunctions/ITableFunctionFileLike.cpp b/src/TableFunctions/ITableFunctionFileLike.cpp index afd81638da4..49461fe8f46 100644 --- a/src/TableFunctions/ITableFunctionFileLike.cpp +++ b/src/TableFunctions/ITableFunctionFileLike.cpp @@ -16,7 +16,6 @@ #include -#include namespace DB diff --git a/src/TableFunctions/TableFunctionExecutable.h b/src/TableFunctions/TableFunctionExecutable.h index 05ef2b3b26b..128ee8e46fc 100644 --- a/src/TableFunctions/TableFunctionExecutable.h +++ b/src/TableFunctions/TableFunctionExecutable.h @@ -1,6 +1,5 @@ #pragma once -#include #include namespace DB