Remove BlockInputStream interfaces.

This commit is contained in:
Nikolai Kochetov 2021-10-14 13:25:43 +03:00
parent 2957971ee3
commit ab28c6c855
79 changed files with 15 additions and 1090 deletions

View File

@ -57,14 +57,6 @@ std::shared_ptr<ASTStorage> createASTStorageDistributed(
} }
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream)
{
return std::make_shared<SquashingBlockInputStream>(
stream,
std::numeric_limits<size_t>::max(),
std::numeric_limits<size_t>::max());
}
Block getBlockWithAllStreamData(QueryPipeline pipeline) Block getBlockWithAllStreamData(QueryPipeline pipeline)
{ {
QueryPipelineBuilder builder; QueryPipelineBuilder builder;

View File

@ -50,8 +50,6 @@
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <DataStreams/RemoteQueryExecutor.h> #include <DataStreams/RemoteQueryExecutor.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>

View File

@ -1,7 +1,6 @@
#include "Handlers.h" #include "Handlers.h"
#include "SharedLibraryHandlerFactory.h" #include "SharedLibraryHandlerFactory.h"
#include <DataStreams/copyData.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h> #include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -10,11 +9,13 @@
#include <Poco/Net/HTTPServerResponse.h> #include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h> #include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h> #include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPipeline.h> #include <Processors/QueryPipeline.h>
#include <Processors/Executors/CompletedPipelineExecutor.h> #include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Pipe.h>
#include <Server/HTTP/HTMLForm.h> #include <Server/HTTP/HTMLForm.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
@ -189,8 +190,10 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
ReadBufferFromString read_block_buf(params.get("null_values")); ReadBufferFromString read_block_buf(params.get("null_values"));
auto format = getContext()->getInputFormat(FORMAT, read_block_buf, *sample_block, DEFAULT_BLOCK_SIZE); auto format = getContext()->getInputFormat(FORMAT, read_block_buf, *sample_block, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format); QueryPipeline pipeline(Pipe(std::move(format)));
auto sample_block_with_nulls = reader->read(); PullingPipelineExecutor executor(pipeline);
Block sample_block_with_nulls;
executor.pull(sample_block_with_nulls);
LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure()); LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure());
@ -281,8 +284,10 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
auto & read_buf = request.getStream(); auto & read_buf = request.getStream();
auto format = getContext()->getInputFormat(FORMAT, read_buf, *requested_sample_block, DEFAULT_BLOCK_SIZE); auto format = getContext()->getInputFormat(FORMAT, read_buf, *requested_sample_block, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format); QueryPipeline pipeline(std::move(format));
auto block = reader->read(); PullingPipelineExecutor executor(pipeline);
Block block;
executor.pull(block);
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id); auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler) if (!library_handler)

View File

@ -2,7 +2,6 @@
#include <Common/SharedLibrary.h> #include <Common/SharedLibrary.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <DataStreams/OneBlockInputStream.h>
#include "LibraryUtils.h" #include "LibraryUtils.h"

View File

@ -4,7 +4,6 @@
#include "ODBCBlockInputStream.h" #include "ODBCBlockInputStream.h"
#include "ODBCBlockOutputStream.h" #include "ODBCBlockOutputStream.h"
#include "getIdentifierQuote.h" #include "getIdentifierQuote.h"
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h> #include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>

View File

@ -6,7 +6,6 @@
#include <Core/Block.h> #include <Core/Block.h>
#include <Core/Protocol.h> #include <Core/Protocol.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/BlockStreamProfileInfo.h> #include <DataStreams/BlockStreamProfileInfo.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>

View File

@ -1,5 +1,4 @@
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>

View File

@ -1,5 +1,4 @@
#include <DataStreams/BlockStreamProfileInfo.h> #include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/IBlockInputStream.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -47,16 +46,12 @@ void BlockStreamProfileInfo::setFrom(const BlockStreamProfileInfo & rhs, bool sk
size_t BlockStreamProfileInfo::getRowsBeforeLimit() const size_t BlockStreamProfileInfo::getRowsBeforeLimit() const
{ {
if (!calculated_rows_before_limit)
calculateRowsBeforeLimit();
return rows_before_limit; return rows_before_limit;
} }
bool BlockStreamProfileInfo::hasAppliedLimit() const bool BlockStreamProfileInfo::hasAppliedLimit() const
{ {
if (!calculated_rows_before_limit)
calculateRowsBeforeLimit();
return applied_limit; return applied_limit;
} }
@ -73,74 +68,4 @@ void BlockStreamProfileInfo::update(size_t num_rows, size_t num_bytes)
bytes += num_bytes; bytes += num_bytes;
} }
void BlockStreamProfileInfo::collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const
{
if (!parent)
return;
if (parent->getName() == name)
{
res.push_back(this);
return;
}
parent->forEachChild([&] (IBlockInputStream & child)
{
child.getProfileInfo().collectInfosForStreamsWithName(name, res);
return false;
});
}
void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
{
calculated_rows_before_limit = true;
/// is there a Limit?
BlockStreamProfileInfos limits;
collectInfosForStreamsWithName("Limit", limits);
if (!limits.empty())
{
applied_limit = true;
/** Take the number of lines read below `PartialSorting`, if any, or below `Limit`.
* This is necessary, because sorting can return only part of the rows.
*/
BlockStreamProfileInfos partial_sortings;
collectInfosForStreamsWithName("PartialSorting", partial_sortings);
BlockStreamProfileInfos & limits_or_sortings = partial_sortings.empty() ? limits : partial_sortings;
for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings)
{
info_limit_or_sort->parent->forEachChild([&] (IBlockInputStream & child)
{
rows_before_limit += child.getProfileInfo().rows;
return false;
});
}
}
else
{
/// Then the data about `rows_before_limit` can be in `RemoteBlockInputStream` (come from a remote server).
BlockStreamProfileInfos remotes;
collectInfosForStreamsWithName("Remote", remotes);
collectInfosForStreamsWithName("TreeExecutor", remotes);
if (remotes.empty())
return;
for (const auto & info : remotes)
{
if (info->applied_limit)
{
applied_limit = true;
rows_before_limit += info->rows_before_limit;
}
}
}
}
} }

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <base/types.h> #include <base/types.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <vector> #include <vector>
@ -16,9 +15,6 @@ class WriteBuffer;
/// Information for profiling. See IBlockInputStream.h /// Information for profiling. See IBlockInputStream.h
struct BlockStreamProfileInfo struct BlockStreamProfileInfo
{ {
/// Info about stream object this profile info refers to.
IBlockInputStream * parent = nullptr;
bool started = false; bool started = false;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time
@ -28,9 +24,6 @@ struct BlockStreamProfileInfo
using BlockStreamProfileInfos = std::vector<const BlockStreamProfileInfo *>; using BlockStreamProfileInfos = std::vector<const BlockStreamProfileInfo *>;
/// Collect BlockStreamProfileInfo for the nearest sources in the tree named `name`. Example; collect all info for PartialSorting streams.
void collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const;
/** Get the number of rows if there were no LIMIT. /** Get the number of rows if there were no LIMIT.
* If there is no LIMIT, 0 is returned. * If there is no LIMIT, 0 is returned.
* If the query does not contain ORDER BY, the number can be underestimated - return the number of rows in blocks that were read before LIMIT reached. * If the query does not contain ORDER BY, the number can be underestimated - return the number of rows in blocks that were read before LIMIT reached.
@ -59,8 +52,6 @@ struct BlockStreamProfileInfo
} }
private: private:
void calculateRowsBeforeLimit() const;
/// For these fields we make accessors, because they must be calculated beforehand. /// For these fields we make accessors, because they must be calculated beforehand.
mutable bool applied_limit = false; /// Whether LIMIT was applied mutable bool applied_limit = false; /// Whether LIMIT was applied
mutable size_t rows_before_limit = 0; mutable size_t rows_before_limit = 0;

View File

@ -1,359 +0,0 @@
#include <DataStreams/IBlockInputStream.h>
#include <Core/Field.h>
#include <Interpreters/ProcessList.h>
#include <Access/EnabledQuota.h>
#include <Common/CurrentThread.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
extern const Event SelectedRows;
extern const Event SelectedBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int QUERY_WAS_CANCELLED;
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int LOGICAL_ERROR;
}
/// It's safe to access children without mutex as long as these methods are called before first call to `read()` or `readPrefix()`.
Block IBlockInputStream::read()
{
if (total_rows_approx)
{
progressImpl(Progress(0, 0, total_rows_approx));
total_rows_approx = 0;
}
if (!info.started)
{
info.total_stopwatch.start();
info.started = true;
}
Block res;
if (isCancelledOrThrowIfKilled())
return res;
if (!checkTimeLimit())
limit_exceeded_need_break = true;
if (!limit_exceeded_need_break)
res = readImpl();
if (res)
{
info.update(res);
if (enabled_extremes)
updateExtremes(res);
if (limits.mode == LimitsMode::LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
limit_exceeded_need_break = true;
if (quota)
checkQuota(res);
}
else
{
/** If the stream is over, then we will ask all children to abort the execution.
* This makes sense when running a query with LIMIT
* - there is a situation when all the necessary data has already been read,
* but children sources are still working,
* herewith they can work in separate threads or even remotely.
*/
cancel(false);
}
progress(Progress(res.rows(), res.bytes()));
#ifndef NDEBUG
if (res)
{
Block header = getHeader();
if (header)
assertBlocksHaveEqualStructure(res, header, getName());
}
#endif
return res;
}
void IBlockInputStream::readPrefix()
{
#ifndef NDEBUG
if (!read_prefix_is_called)
read_prefix_is_called = true;
else
throw Exception("readPrefix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
#endif
readPrefixImpl();
forEachChild([&] (IBlockInputStream & child)
{
child.readPrefix();
return false;
});
}
void IBlockInputStream::readSuffix()
{
#ifndef NDEBUG
if (!read_suffix_is_called)
read_suffix_is_called = true;
else
throw Exception("readSuffix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
#endif
forEachChild([&] (IBlockInputStream & child)
{
child.readSuffix();
return false;
});
readSuffixImpl();
}
void IBlockInputStream::updateExtremes(Block & block)
{
size_t num_columns = block.columns();
if (!extremes)
{
MutableColumns extremes_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnPtr & src = block.safeGetByPosition(i).column;
if (isColumnConst(*src))
{
/// Equal min and max.
extremes_columns[i] = src->cloneResized(2);
}
else
{
Field min_value;
Field max_value;
src->getExtremes(min_value, max_value);
extremes_columns[i] = src->cloneEmpty();
extremes_columns[i]->insert(min_value);
extremes_columns[i]->insert(max_value);
}
}
extremes = block.cloneWithColumns(std::move(extremes_columns));
}
else
{
for (size_t i = 0; i < num_columns; ++i)
{
ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column;
if (isColumnConst(*old_extremes))
continue;
Field min_value = (*old_extremes)[0];
Field max_value = (*old_extremes)[1];
Field cur_min_value;
Field cur_max_value;
block.safeGetByPosition(i).column->getExtremes(cur_min_value, cur_max_value);
if (cur_min_value < min_value)
min_value = cur_min_value;
if (cur_max_value > max_value)
max_value = cur_max_value;
MutableColumnPtr new_extremes = old_extremes->cloneEmpty();
new_extremes->insert(min_value);
new_extremes->insert(max_value);
old_extremes = std::move(new_extremes);
}
}
}
bool IBlockInputStream::checkTimeLimit() const
{
return limits.speed_limits.checkTimeLimit(info.total_stopwatch, limits.timeout_overflow_mode);
}
void IBlockInputStream::checkQuota(Block & block)
{
switch (limits.mode)
{
case LimitsMode::LIMITS_TOTAL:
/// Checked in `progress` method.
break;
case LimitsMode::LIMITS_CURRENT:
{
UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds();
quota->used({Quota::RESULT_ROWS, block.rows()}, {Quota::RESULT_BYTES, block.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed});
prev_elapsed = total_elapsed;
break;
}
}
}
void IBlockInputStream::progressImpl(const Progress & value)
{
if (progress_callback)
progress_callback(value);
if (process_list_elem)
{
if (!process_list_elem->updateProgressIn(value))
cancel(/* kill */ true);
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
ProgressValues progress = process_list_elem->getProgressIn();
size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
/** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
* NOTE: Maybe it makes sense to have them checked directly in ProcessList?
*/
if (limits.mode == LimitsMode::LIMITS_TOTAL)
{
if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read",
ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
cancel(false);
}
size_t total_rows = progress.total_rows_to_read;
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{
CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds;
}
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
if (quota && limits.mode == LimitsMode::LIMITS_TOTAL)
quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes});
}
ProfileEvents::increment(ProfileEvents::SelectedRows, value.read_rows);
ProfileEvents::increment(ProfileEvents::SelectedBytes, value.read_bytes);
}
void IBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
forEachChild([&] (IBlockInputStream & child)
{
child.cancel(kill);
return false;
});
}
bool IBlockInputStream::isCancelled() const
{
return is_cancelled;
}
bool IBlockInputStream::isCancelledOrThrowIfKilled() const
{
if (!is_cancelled)
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
void IBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
progress_callback = callback;
forEachChild([&] (IBlockInputStream & child)
{
child.setProgressCallback(callback);
return false;
});
}
void IBlockInputStream::setProcessListElement(QueryStatus * elem)
{
process_list_elem = elem;
forEachChild([&] (IBlockInputStream & child)
{
child.setProcessListElement(elem);
return false;
});
}
Block IBlockInputStream::getTotals()
{
if (totals)
return totals;
Block res;
forEachChild([&] (IBlockInputStream & child)
{
res = child.getTotals();
return bool(res);
});
return res;
}
Block IBlockInputStream::getExtremes()
{
if (extremes)
return extremes;
Block res;
forEachChild([&] (IBlockInputStream & child)
{
res = child.getExtremes();
return bool(res);
});
return res;
}
}

View File

@ -1,271 +0,0 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/ExecutionSpeedLimits.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/StreamLocalLimits.h>
#include <IO/Progress.h>
#include <Storages/TableLockHolder.h>
#include <Common/TypePromotion.h>
#include <atomic>
#include <shared_mutex>
namespace DB
{
namespace ErrorCodes
{
}
class ProcessListElement;
class EnabledQuota;
class QueryStatus;
/** The stream interface for reading data by blocks from the database.
* Relational operations are supposed to be done also as implementations of this interface.
* Watches out at how the source of the blocks works.
* Lets you get information for profiling: rows per second, blocks per second, megabytes per second, etc.
* Allows you to stop reading data (in nested sources).
*/
class IBlockInputStream : public TypePromotion<IBlockInputStream>
{
friend struct BlockStreamProfileInfo;
public:
IBlockInputStream() { info.parent = this; }
virtual ~IBlockInputStream() = default;
IBlockInputStream(const IBlockInputStream &) = delete;
IBlockInputStream & operator=(const IBlockInputStream &) = delete;
/// To output the data stream transformation tree (query execution plan).
virtual String getName() const = 0;
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* It is guaranteed that method "read" returns blocks of exactly that structure.
*/
virtual Block getHeader() const = 0;
virtual const BlockMissingValues & getMissingValues() const
{
static const BlockMissingValues none;
return none;
}
/** Read next block.
* If there are no more blocks, return an empty block (for which operator `bool` returns false).
* NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
* This also applies for readPrefix, readSuffix.
*/
Block read();
/** Read something before starting all data or after the end of all data.
* In the `readSuffix` function, you can implement a finalization that can lead to an exception.
* readPrefix() must be called before the first call to read().
* readSuffix() should be called after read() returns an empty block, or after a call to cancel(), but not during read() execution.
*/
/** The default implementation calls readPrefixImpl() on itself, and then readPrefix() recursively for all children.
* There are cases when you do not want `readPrefix` of children to be called synchronously, in this function,
* but you want them to be called, for example, in separate threads (for parallel initialization of children).
* Then overload `readPrefix` function.
*/
virtual void readPrefix();
/** The default implementation calls recursively readSuffix() on all children, and then readSuffixImpl() on itself.
* If this stream calls read() in children in a separate thread, this behavior is usually incorrect:
* readSuffix() of the child can not be called at the moment when the same child's read() is executed in another thread.
* In this case, you need to override this method so that readSuffix() in children is called, for example, after connecting streams.
*/
virtual void readSuffix();
/// Do not allow to change the table while the blocks stream and its children are alive.
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
/// Get information about execution speed.
const BlockStreamProfileInfo & getProfileInfo() const { return info; }
/** Get "total" values.
* The default implementation takes them from itself or from the first child source in which they are.
* The overridden method can perform some calculations. For example, apply an expression to the `totals` of the child source.
* There can be no total values - then an empty block is returned.
*
* Call this method only after all the data has been retrieved with `read`,
* otherwise there will be problems if any data at the same time is computed in another thread.
*/
virtual Block getTotals();
/// The same for minimums and maximums.
virtual Block getExtremes();
/** Set the execution progress bar callback.
* The callback is passed to all child sources.
* By default, it is called for leaf sources, after each block.
* (But this can be overridden in the progress() method)
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/
virtual void setProgressCallback(const ProgressCallback & callback);
/** In this method:
* - the progress callback is called;
* - the status of the query execution in ProcessList is updated;
* - checks restrictions and quotas that should be checked not within the same source,
* but over the total amount of resources spent in all sources at once (information in the ProcessList).
*/
virtual void progress(const Progress & value)
{
/// The data for progress is taken from leaf sources.
if (children.empty())
progressImpl(value);
}
void progressImpl(const Progress & value);
/** Set the pointer to the process list item.
* It is passed to all child sources.
* General information about the resources spent on the request will be written into it.
* Based on this information, the quota and some restrictions will be checked.
* This information will also be available in the SHOW PROCESSLIST request.
*/
virtual void setProcessListElement(QueryStatus * elem);
/** Set the approximate total number of rows to read.
*/
void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
/** Ask to abort the receipt of data as soon as possible.
* By default - just sets the flag is_cancelled and asks that all children be interrupted.
* This function can be called several times, including simultaneously from different threads.
* Have two modes:
* with kill = false only is_cancelled is set - streams will stop silently with returning some processed data.
* with kill = true also is_killed set - queries will stop with exception.
*/
virtual void cancel(bool kill);
bool isCancelled() const;
bool isCancelledOrThrowIfKilled() const;
/** Set limitations that checked on each block. */
virtual void setLimits(const StreamLocalLimits & limits_)
{
limits = limits_;
}
const StreamLocalLimits & getLimits() const
{
return limits;
}
/** Set the quota. If you set a quota on the amount of raw data,
* then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
*/
virtual void setQuota(const std::shared_ptr<const EnabledQuota> & new_quota)
{
quota = new_quota;
}
/// Enable calculation of minimums and maximums by the result columns.
void enableExtremes() { enabled_extremes = true; }
protected:
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
/// which child streams read are protected by the locks during the lifetime of the child streams.
std::vector<TableLockHolder> table_locks;
BlockInputStreams children;
std::shared_mutex children_mutex;
BlockStreamProfileInfo info;
std::atomic<bool> is_cancelled{false};
std::atomic<bool> is_killed{false};
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;
/// According to total_stopwatch in microseconds
UInt64 last_profile_events_update_time = 0;
/// Additional information that can be generated during the work process.
/// Total values during aggregation.
Block totals;
/// Minimums and maximums. The first row of the block - minimums, the second - the maximums.
Block extremes;
void addChild(const BlockInputStreamPtr & child)
{
std::unique_lock lock(children_mutex);
children.push_back(child);
}
/** Check limits.
* But only those that can be checked within each separate stream.
*/
bool checkTimeLimit() const;
#ifndef NDEBUG
bool read_prefix_is_called = false;
bool read_suffix_is_called = false;
#endif
private:
bool enabled_extremes = false;
/// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out.
bool limit_exceeded_need_break = false;
/// Limitations and quotas.
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota; /// If nullptr - the quota is not used.
UInt64 prev_elapsed = 0;
/// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0;
/// Derived classes must implement this function.
virtual Block readImpl() = 0;
/// Here you can do a preliminary initialization.
virtual void readPrefixImpl() {}
/// Here you need to do a finalization, which can lead to an exception.
virtual void readSuffixImpl() {}
void updateExtremes(Block & block);
/** Check quotas.
* But only those that can be checked within each separate stream.
*/
void checkQuota(Block & block);
size_t checkDepthImpl(size_t max_depth, size_t level) const;
template <typename F>
void forEachChild(F && f)
{
/// NOTE: Acquire a read lock, therefore f() should be thread safe
std::shared_lock lock(children_mutex);
// Reduce lock scope and avoid recursive locking since that is undefined for shared_mutex.
const auto children_copy = children;
lock.unlock();
for (auto & child : children_copy)
if (f(*child))
return;
}
};
}

View File

@ -1,70 +0,0 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Storages/TableLockHolder.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <string>
#include <vector>
namespace DB
{
struct Progress;
/** Interface of stream for writing data (into table, filesystem, network, terminal, etc.)
*/
class IBlockOutputStream : private boost::noncopyable
{
public:
IBlockOutputStream() = default;
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* You must pass blocks of exactly this structure to the 'write' method.
*/
virtual Block getHeader() const = 0;
/** Write block.
*/
virtual void write(const Block & block) = 0;
/** Write or do something before all data or after all data.
*/
virtual void writePrefix() {}
virtual void writeSuffix() {}
/** Flush output buffers if any.
*/
virtual void flush() {}
/** Methods to set additional information for output in formats, that support it.
*/
virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
virtual void setTotals(const Block & /*totals*/) {}
virtual void setExtremes(const Block & /*extremes*/) {}
/** Notify about progress. Method could be called from different threads.
* Passed value are delta, that must be summarized.
*/
virtual void onProgress(const Progress & /*progress*/) {}
/** Content-Type to set when sending HTTP response.
*/
virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; }
virtual ~IBlockOutputStream() = default;
/** Don't let to alter table while instance of stream is alive.
*/
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
private:
std::vector<TableLockHolder> table_locks;
};
}

View File

@ -1,17 +0,0 @@
#pragma once
#include <memory>
#include <vector>
namespace DB
{
class IBlockInputStream;
class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
using BlockOutputStreams = std::vector<BlockOutputStreamPtr>;
}

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <Core/Block.h>
namespace DB namespace DB

View File

@ -1,8 +1,8 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/MarkInCompressedFile.h> #include <DataStreams/MarkInCompressedFile.h>
#include <Common/PODArray.h> #include <Common/PODArray.h>
#include <Core/Block.h>
namespace DB namespace DB
{ {

View File

@ -1,8 +1,6 @@
#include <DataStreams/TemporaryFileStream.h> #include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeReader.h> #include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h> #include <DataStreams/NativeWriter.h>
#include <DataStreams/copyData.h>
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h> #include <Processors/ISource.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>

View File

@ -4,7 +4,6 @@
#include <Processors/QueryPipelineBuilder.h> #include <Processors/QueryPipelineBuilder.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/NativeReader.h> #include <DataStreams/NativeReader.h>
namespace DB namespace DB

View File

@ -1,86 +0,0 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Common/ThreadPool.h>
namespace DB
{
namespace
{
bool isAtomicSet(std::atomic<bool> * val)
{
return ((val != nullptr) && val->load(std::memory_order_seq_cst));
}
}
template <typename TCancelCallback, typename TProgressCallback>
void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCallback && is_cancelled, TProgressCallback && progress)
{
from.readPrefix();
to.writePrefix();
while (Block block = from.read())
{
if (is_cancelled())
break;
to.write(block);
progress(block);
}
if (is_cancelled())
return;
/// For outputting additional information in some formats.
if (from.getProfileInfo().hasAppliedLimit())
to.setRowsBeforeLimit(from.getProfileInfo().getRowsBeforeLimit());
to.setTotals(from.getTotals());
to.setExtremes(from.getExtremes());
if (is_cancelled())
return;
from.readSuffix();
to.writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred, progress);
}
inline void doNothing(const Block &) {}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred, doNothing);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{
copyDataImpl(from, to, is_cancelled, doNothing);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress)
{
copyDataImpl(from, to, is_cancelled, progress);
}
}

View File

@ -1,27 +0,0 @@
#pragma once
#include <DataStreams/IBlockStream_fwd.h>
#include <atomic>
#include <functional>
namespace DB
{
class Block;
/** Copies data from the InputStream into the OutputStream
* (for example, from the database to the console, etc.)
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress);
}

View File

@ -14,7 +14,6 @@
# include <Processors/Executors/CompletedPipelineExecutor.h> # include <Processors/Executors/CompletedPipelineExecutor.h>
# include <Processors/Sources/SourceFromSingleChunk.h> # include <Processors/Sources/SourceFromSingleChunk.h>
# include <Processors/Transforms/CountingTransform.h> # include <Processors/Transforms/CountingTransform.h>
# include <DataStreams/copyData.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h> # include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Databases/MySQL/MaterializeMetadata.h> # include <Databases/MySQL/MaterializeMetadata.h>
# include <Formats/MySQLSource.h> # include <Formats/MySQLSource.h>

View File

@ -1,7 +1,6 @@
#include "DictionarySourceHelpers.h" #include "DictionarySourceHelpers.h"
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Core/ColumnWithTypeAndName.h> #include <Core/ColumnWithTypeAndName.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include "DictionaryStructure.h" #include "DictionaryStructure.h"

View File

@ -1,5 +1,4 @@
#include "HTTPDictionarySource.h" #include "HTTPDictionarySource.h"
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/formatBlock.h> #include <DataStreams/formatBlock.h>
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>

View File

@ -7,7 +7,6 @@
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <Processors/Formats/IRowInputFormat.h> #include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h> #include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h> #include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h> #include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h> #include <Processors/Formats/Impl/ParallelParsingInputFormat.h>

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>

View File

@ -19,7 +19,6 @@
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
#include <Disks/SingleDiskVolume.h> #include <Disks/SingleDiskVolume.h>

View File

@ -5,7 +5,6 @@
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Core/UUID.h> #include <Core/UUID.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/ClientInfo.h> #include <Interpreters/ClientInfo.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>

View File

@ -35,7 +35,6 @@
#include <Storages/StorageDictionary.h> #include <Storages/StorageDictionary.h>
#include <Storages/StorageJoin.h> #include <Storages/StorageJoin.h>
#include <DataStreams/copyData.h>
#include <Dictionaries/DictionaryStructure.h> #include <Dictionaries/DictionaryStructure.h>

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <Columns/FilterDescription.h> #include <Columns/FilterDescription.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/AggregateDescription.h> #include <Interpreters/AggregateDescription.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/SubqueryForSet.h> #include <Interpreters/SubqueryForSet.h>

View File

@ -2,7 +2,6 @@
#include <Parsers/TablePropertiesQueriesASTs.h> #include <Parsers/TablePropertiesQueriesASTs.h>
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h> #include <DataStreams/BlockIO.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>

View File

@ -3,7 +3,6 @@
#include <Access/AccessFlags.h> #include <Access/AccessFlags.h>
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Processors/Transforms/buildPushingToViewsChain.h> #include <Processors/Transforms/buildPushingToViewsChain.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h> #include <Interpreters/InterpreterSelectWithUnionQuery.h>

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/BlockIO.h> #include <DataStreams/BlockIO.h>
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>

View File

@ -3,7 +3,6 @@
#include <memory> #include <memory>
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreterUnionOrSelectQuery.h> #include <Interpreters/IInterpreterUnionOrSelectQuery.h>
@ -52,13 +51,6 @@ public:
const SelectQueryOptions &, const SelectQueryOptions &,
const Names & required_result_column_names_ = Names{}); const Names & required_result_column_names_ = Names{});
/// Read data not from the table specified in the query, but from the prepared source `input`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const BlockInputStreamPtr & input_,
const SelectQueryOptions & = {});
/// Read data not from the table specified in the query, but from the prepared pipe `input`. /// Read data not from the table specified in the query, but from the prepared pipe `input`.
InterpreterSelectQuery( InterpreterSelectQuery(
const ASTPtr & query_ptr_, const ASTPtr & query_ptr_,

View File

@ -3,7 +3,6 @@
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h> #include <DataStreams/BlockIO.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>

View File

@ -15,7 +15,6 @@ limitations under the License. */
#include <Interpreters/InterpreterWatchQuery.h> #include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Access/AccessFlags.h> #include <Access/AccessFlags.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/StreamLocalLimits.h> #include <DataStreams/StreamLocalLimits.h>

View File

@ -38,8 +38,6 @@ private:
/// Table from where to read data, if not subquery. /// Table from where to read data, if not subquery.
StoragePtr storage; StoragePtr storage;
/// Streams of read data
BlockInputStreams streams;
}; };
} }

View File

@ -5,7 +5,6 @@
#include <Core/Block.h> #include <Core/Block.h>
#include <Interpreters/IJoin.h> #include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h> #include <Interpreters/TableJoin.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB namespace DB

View File

@ -11,7 +11,6 @@
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <DataStreams/IBlockInputStream.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <chrono> #include <chrono>

View File

@ -8,7 +8,6 @@
#include <Core/SortDescription.h> #include <Core/SortDescription.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
namespace DB namespace DB

View File

@ -7,7 +7,6 @@
#include <Interpreters/IJoin.h> #include <Interpreters/IJoin.h>
#include <Interpreters/join_common.h> #include <Interpreters/join_common.h>
#include <Interpreters/asof.h> #include <Interpreters/asof.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
#include <DataTypes/getLeastSupertype.h> #include <DataTypes/getLeastSupertype.h>
#include <Storages/IStorage_fwd.h> #include <Storages/IStorage_fwd.h>

View File

@ -4,7 +4,6 @@
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <DataStreams/IBlockInputStream.h>
namespace zkutil namespace zkutil

View File

@ -10,8 +10,6 @@
#include <IO/copyData.h> #include <IO/copyData.h>
#include <DataStreams/BlockIO.h> #include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/Transforms/CountingTransform.h> #include <Processors/Transforms/CountingTransform.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h> #include <Processors/Transforms/getSourceFromASTInsertQuery.h>

View File

@ -1,43 +0,0 @@
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
namespace DB
{
Block OutputStreamToOutputFormat::getHeader() const
{
return output_format->getPort(IOutputFormat::PortKind::Main).getHeader();
}
void OutputStreamToOutputFormat::write(const Block & block)
{
output_format->write(block);
}
void OutputStreamToOutputFormat::writePrefix() { output_format->doWritePrefix(); }
void OutputStreamToOutputFormat::writeSuffix() { output_format->doWriteSuffix(); }
void OutputStreamToOutputFormat::flush() { output_format->flush(); }
void OutputStreamToOutputFormat::setRowsBeforeLimit(size_t rows_before_limit)
{
output_format->setRowsBeforeLimit(rows_before_limit);
}
void OutputStreamToOutputFormat::setTotals(const Block & totals)
{
if (totals)
output_format->setTotals(totals);
}
void OutputStreamToOutputFormat::setExtremes(const Block & extremes)
{
if (extremes)
output_format->setExtremes(extremes);
}
void OutputStreamToOutputFormat::onProgress(const Progress & progress) { output_format->onProgress(progress); }
std::string OutputStreamToOutputFormat::getContentType() const { return output_format->getContentType(); }
}

View File

@ -1,39 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
/// Wrapper. Implements IBlockOutputStream interface using IOutputFormat object.
class OutputStreamToOutputFormat : public IBlockOutputStream
{
public:
explicit OutputStreamToOutputFormat(OutputFormatPtr output_format_) : output_format(std::move(output_format_)) {}
Block getHeader() const override;
void write(const Block & block) override;
void writePrefix() override;
void writeSuffix() override;
void flush() override;
void setRowsBeforeLimit(size_t rows_before_limit) override;
void setTotals(const Block & totals) override;
void setExtremes(const Block & extremes) override;
void onProgress(const Progress & progress) override;
std::string getContentType() const override;
private:
OutputFormatPtr output_format;
};
}

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h> #include <Processors/Executors/PipelineExecutor.h>
#include <Processors/IProcessor.h> #include <Processors/IProcessor.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>

View File

@ -1,7 +1,6 @@
#include <Processors/Transforms/CreatingSetsTransform.h> #include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Executors/PushingPipelineExecutor.h> #include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/Set.h> #include <Interpreters/Set.h>
#include <Interpreters/IJoin.h> #include <Interpreters/IJoin.h>

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Interpreters/SubqueryForSet.h> #include <Interpreters/SubqueryForSet.h>
#include <Processors/IAccumulatingTransform.h> #include <Processors/IAccumulatingTransform.h>

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/QueryViewsLog.h> #include <Interpreters/QueryViewsLog.h>
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Processors/Chain.h> #include <Processors/Chain.h>

View File

@ -7,7 +7,6 @@
#include <Core/MySQL/PacketsConnection.h> #include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsProtocolText.h> #include <Core/MySQL/PacketsProtocolText.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <DataStreams/copyData.h>
#include <Interpreters/Session.h> #include <Interpreters/Session.h>
#include <Interpreters/executeQuery.h> #include <Interpreters/executeQuery.h>
#include <IO/copyData.h> #include <IO/copyData.h>

View File

@ -13,7 +13,6 @@
#include <Interpreters/InternalTextLogsQueue.h> #include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <DataStreams/NativeReader.h> #include <DataStreams/NativeReader.h>
#include <DataStreams/IBlockStream_fwd.h>
#include "IServer.h" #include "IServer.h"

View File

@ -2,7 +2,6 @@
#include <Core/Names.h> #include <Core/Names.h>
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Interpreters/CancellationCode.h> #include <Interpreters/CancellationCode.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>

View File

@ -1,7 +1,6 @@
#include <Storages/Kafka/StorageKafka.h> #include <Storages/Kafka/StorageKafka.h>
#include <Storages/Kafka/parseSyslogLevel.h> #include <Storages/Kafka/parseSyslogLevel.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h> #include <DataTypes/DataTypeDateTime64.h>

View File

@ -21,7 +21,6 @@ limitations under the License. */
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h> #include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/ExpressionTransform.h> #include <Processors/Transforms/ExpressionTransform.h>
#include <DataStreams/copyData.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/SipHash.h> #include <Common/SipHash.h>

View File

@ -5,7 +5,6 @@
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h> #include <IO/HashingWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Disks/IDisk.h> #include <Disks/IDisk.h>

View File

@ -4,7 +4,6 @@
#include <Backups/BackupEntryFromSmallFile.h> #include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/IBackup.h> #include <Backups/IBackup.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>

View File

@ -6,7 +6,6 @@
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h> #include <IO/HashingWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Disks/IDisk.h> #include <Disks/IDisk.h>

View File

@ -2,9 +2,9 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h> #include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h> #include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Interpreters/PartLog.h> #include <Interpreters/PartLog.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/SipHash.h> #include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h> #include <Common/ZooKeeper/KeeperException.h>
#include <Core/Block.h>
#include <IO/Operators.h> #include <IO/Operators.h>

View File

@ -3,7 +3,6 @@
#include "StorageMaterializedPostgreSQL.h" #include "StorageMaterializedPostgreSQL.h"
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Common/hex.h> #include <Common/hex.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h> #include <Interpreters/InterpreterInsertQuery.h>

View File

@ -9,7 +9,6 @@
#include <Interpreters/InterpreterRenameQuery.h> #include <Interpreters/InterpreterRenameQuery.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <DataStreams/copyData.h>
#include <Databases/DatabaseOnDisk.h> #include <Databases/DatabaseOnDisk.h>

View File

@ -1,5 +1,4 @@
#include <amqpcpp.h> #include <amqpcpp.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>

View File

@ -2,7 +2,6 @@
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <base/shared_ptr_helper.h> #include <base/shared_ptr_helper.h>

View File

@ -11,7 +11,6 @@
#include <Parsers/ASTSelectWithUnionQuery.h> #include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/ISimpleTransform.h> #include <Processors/ISimpleTransform.h>
#include <Processors/Executors/CompletedPipelineExecutor.h> #include <Processors/Executors/CompletedPipelineExecutor.h>

View File

@ -16,7 +16,6 @@
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
@ -479,8 +478,6 @@ Pipe StorageFile::read(
size_t max_block_size, size_t max_block_size,
unsigned num_streams) unsigned num_streams)
{ {
BlockInputStreams blocks_input;
if (use_table_fd) /// need to call ctr BlockInputStream if (use_table_fd) /// need to call ctr BlockInputStream
paths = {""}; /// when use fd, paths are empty paths = {""}; /// when use fd, paths are empty
else else

View File

@ -16,8 +16,6 @@
#include <DataTypes/NestedUtils.h> #include <DataTypes/NestedUtils.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>

View File

@ -12,7 +12,6 @@
#include <Interpreters/AddDefaultDatabaseVisitor.h> #include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/getHeaderForProcessingStage.h> #include <Interpreters/getHeaderForProcessingStage.h>
#include <Access/AccessFlags.h> #include <Access/AccessFlags.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/AlterCommands.h> #include <Storages/AlterCommands.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>

View File

@ -8,7 +8,6 @@
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/MultiVersion.h> #include <Common/MultiVersion.h>

View File

@ -9,7 +9,6 @@
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/parseAddress.h> #include <Common/parseAddress.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>

View File

@ -9,7 +9,6 @@
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Common/parseAddress.h> #include <Common/parseAddress.h>

View File

@ -8,7 +8,6 @@
#include <base/shared_ptr_helper.h> #include <base/shared_ptr_helper.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Core/PostgreSQL/PoolWithFailover.h> #include <Core/PostgreSQL/PoolWithFailover.h>
#include <Storages/ExternalDataSourceConfiguration.h> #include <Storages/ExternalDataSourceConfiguration.h>

View File

@ -60,7 +60,6 @@
#include <Interpreters/DDLTask.h> #include <Interpreters/DDLTask.h>
#include <Interpreters/InterserverCredentials.h> #include <Interpreters/InterserverCredentials.h>
#include <DataStreams/copyData.h>
#include <Poco/DirectoryIterator.h> #include <Poco/DirectoryIterator.h>

View File

@ -30,7 +30,6 @@
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>

View File

@ -22,7 +22,6 @@
#include <Interpreters/SelectQueryOptions.h> #include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getTableExpressions.h> #include <Interpreters/getTableExpressions.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
#include <DataStreams/narrowBlockInputStreams.h> #include <DataStreams/narrowBlockInputStreams.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>

View File

@ -6,6 +6,7 @@
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/NativeWriter.h> #include <DataStreams/NativeWriter.h>
#include <DataStreams/NativeReader.h> #include <DataStreams/NativeReader.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>

View File

@ -14,7 +14,6 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/NativeReader.h> #include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h> #include <DataStreams/NativeWriter.h>

View File

@ -15,7 +15,6 @@
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Poco/Net/HTTPRequest.h> #include <Poco/Net/HTTPRequest.h>

View File

@ -1,6 +1,5 @@
#include "StorageXDBC.h" #include "StorageXDBC.h"
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>

View File

@ -15,7 +15,6 @@
#include <Parsers/ASTSubquery.h> #include <Parsers/ASTSubquery.h>
#include <Interpreters/Set.h> #include <Interpreters/Set.h>
#include <Interpreters/interpretSubquery.h> #include <Interpreters/interpretSubquery.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>

View File

@ -1,7 +1,6 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <Disks/tests/gtest_disk.h> #include <Disks/tests/gtest_disk.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>

View File

@ -16,7 +16,6 @@
#include <Processors/ISource.h> #include <Processors/ISource.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB namespace DB

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <DataStreams/IBlockStream_fwd.h>
#include <TableFunctions/ITableFunction.h> #include <TableFunctions/ITableFunction.h>
namespace DB namespace DB