Merge pull request #30171 from ClickHouse/remove-stream-interfaces

Remove stream interfaces
This commit is contained in:
Nikolai Kochetov 2021-10-16 09:34:01 +03:00 committed by GitHub
commit c668696047
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
121 changed files with 85 additions and 1968 deletions

View File

@ -57,14 +57,6 @@ std::shared_ptr<ASTStorage> createASTStorageDistributed(
}
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream)
{
return std::make_shared<SquashingBlockInputStream>(
stream,
std::numeric_limits<size_t>::max(),
std::numeric_limits<size_t>::max());
}
Block getBlockWithAllStreamData(QueryPipeline pipeline)
{
QueryPipelineBuilder builder;
@ -82,7 +74,6 @@ Block getBlockWithAllStreamData(QueryPipeline pipeline)
return block;
}
bool isExtendedDefinitionStorage(const ASTPtr & storage_ast)
{
const auto & storage = storage_ast->as<ASTStorage &>();

View File

@ -50,8 +50,6 @@
#include <Parsers/ASTExpressionList.h>
#include <Formats/FormatSettings.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>

View File

@ -1,7 +1,6 @@
#include "Handlers.h"
#include "SharedLibraryHandlerFactory.h"
#include <DataStreams/copyData.h>
#include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
@ -10,11 +9,13 @@
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Pipe.h>
#include <Server/HTTP/HTMLForm.h>
#include <IO/ReadBufferFromString.h>
@ -189,8 +190,10 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
ReadBufferFromString read_block_buf(params.get("null_values"));
auto format = getContext()->getInputFormat(FORMAT, read_block_buf, *sample_block, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto sample_block_with_nulls = reader->read();
QueryPipeline pipeline(Pipe(std::move(format)));
PullingPipelineExecutor executor(pipeline);
Block sample_block_with_nulls;
executor.pull(sample_block_with_nulls);
LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure());
@ -281,8 +284,10 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
auto & read_buf = request.getStream();
auto format = getContext()->getInputFormat(FORMAT, read_buf, *requested_sample_block, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto block = reader->read();
QueryPipeline pipeline(std::move(format));
PullingPipelineExecutor executor(pipeline);
Block block;
executor.pull(block);
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)

View File

@ -2,7 +2,6 @@
#include <Common/SharedLibrary.h>
#include <base/logger_useful.h>
#include <DataStreams/OneBlockInputStream.h>
#include "LibraryUtils.h"

View File

@ -4,7 +4,6 @@
#include "ODBCBlockInputStream.h"
#include "ODBCBlockOutputStream.h"
#include "getIdentifierQuote.h"
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
@ -15,9 +14,9 @@
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <base/logger_useful.h>
#include <Server/HTTP/HTMLForm.h>

View File

@ -6,7 +6,6 @@
#include <Core/Block.h>
#include <Core/Protocol.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <Processors/Pipe.h>

View File

@ -1,5 +1,4 @@
#include <boost/program_options.hpp>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
@ -12,7 +11,6 @@
#include <Processors/Pipe.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Formats/IInputFormat.h>

View File

@ -1,6 +1,5 @@
#include <DataStreams/BlockIO.h>
#include <Interpreters/ProcessList.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
namespace DB
{

View File

@ -1,5 +1,4 @@
#include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/IBlockInputStream.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -47,16 +46,14 @@ void BlockStreamProfileInfo::setFrom(const BlockStreamProfileInfo & rhs, bool sk
size_t BlockStreamProfileInfo::getRowsBeforeLimit() const
{
if (!calculated_rows_before_limit)
calculateRowsBeforeLimit();
calculated_rows_before_limit = true;
return rows_before_limit;
}
bool BlockStreamProfileInfo::hasAppliedLimit() const
{
if (!calculated_rows_before_limit)
calculateRowsBeforeLimit();
calculated_rows_before_limit = true;
return applied_limit;
}
@ -73,74 +70,4 @@ void BlockStreamProfileInfo::update(size_t num_rows, size_t num_bytes)
bytes += num_bytes;
}
void BlockStreamProfileInfo::collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const
{
if (!parent)
return;
if (parent->getName() == name)
{
res.push_back(this);
return;
}
parent->forEachChild([&] (IBlockInputStream & child)
{
child.getProfileInfo().collectInfosForStreamsWithName(name, res);
return false;
});
}
void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
{
calculated_rows_before_limit = true;
/// is there a Limit?
BlockStreamProfileInfos limits;
collectInfosForStreamsWithName("Limit", limits);
if (!limits.empty())
{
applied_limit = true;
/** Take the number of lines read below `PartialSorting`, if any, or below `Limit`.
* This is necessary, because sorting can return only part of the rows.
*/
BlockStreamProfileInfos partial_sortings;
collectInfosForStreamsWithName("PartialSorting", partial_sortings);
BlockStreamProfileInfos & limits_or_sortings = partial_sortings.empty() ? limits : partial_sortings;
for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings)
{
info_limit_or_sort->parent->forEachChild([&] (IBlockInputStream & child)
{
rows_before_limit += child.getProfileInfo().rows;
return false;
});
}
}
else
{
/// Then the data about `rows_before_limit` can be in `RemoteBlockInputStream` (come from a remote server).
BlockStreamProfileInfos remotes;
collectInfosForStreamsWithName("Remote", remotes);
collectInfosForStreamsWithName("TreeExecutor", remotes);
if (remotes.empty())
return;
for (const auto & info : remotes)
{
if (info->applied_limit)
{
applied_limit = true;
rows_before_limit += info->rows_before_limit;
}
}
}
}
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <base/types.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Common/Stopwatch.h>
#include <vector>
@ -16,9 +15,6 @@ class WriteBuffer;
/// Information for profiling. See IBlockInputStream.h
struct BlockStreamProfileInfo
{
/// Info about stream object this profile info refers to.
IBlockInputStream * parent = nullptr;
bool started = false;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time
@ -28,9 +24,6 @@ struct BlockStreamProfileInfo
using BlockStreamProfileInfos = std::vector<const BlockStreamProfileInfo *>;
/// Collect BlockStreamProfileInfo for the nearest sources in the tree named `name`. Example; collect all info for PartialSorting streams.
void collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const;
/** Get the number of rows if there were no LIMIT.
* If there is no LIMIT, 0 is returned.
* If the query does not contain ORDER BY, the number can be underestimated - return the number of rows in blocks that were read before LIMIT reached.
@ -59,8 +52,6 @@ struct BlockStreamProfileInfo
}
private:
void calculateRowsBeforeLimit() const;
/// For these fields we make accessors, because they must be calculated beforehand.
mutable bool applied_limit = false; /// Whether LIMIT was applied
mutable size_t rows_before_limit = 0;

View File

@ -1,359 +0,0 @@
#include <DataStreams/IBlockInputStream.h>
#include <Core/Field.h>
#include <Interpreters/ProcessList.h>
#include <Access/EnabledQuota.h>
#include <Common/CurrentThread.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
extern const Event SelectedRows;
extern const Event SelectedBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int QUERY_WAS_CANCELLED;
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int LOGICAL_ERROR;
}
/// It's safe to access children without mutex as long as these methods are called before first call to `read()` or `readPrefix()`.
Block IBlockInputStream::read()
{
if (total_rows_approx)
{
progressImpl(Progress(0, 0, total_rows_approx));
total_rows_approx = 0;
}
if (!info.started)
{
info.total_stopwatch.start();
info.started = true;
}
Block res;
if (isCancelledOrThrowIfKilled())
return res;
if (!checkTimeLimit())
limit_exceeded_need_break = true;
if (!limit_exceeded_need_break)
res = readImpl();
if (res)
{
info.update(res);
if (enabled_extremes)
updateExtremes(res);
if (limits.mode == LimitsMode::LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
limit_exceeded_need_break = true;
if (quota)
checkQuota(res);
}
else
{
/** If the stream is over, then we will ask all children to abort the execution.
* This makes sense when running a query with LIMIT
* - there is a situation when all the necessary data has already been read,
* but children sources are still working,
* herewith they can work in separate threads or even remotely.
*/
cancel(false);
}
progress(Progress(res.rows(), res.bytes()));
#ifndef NDEBUG
if (res)
{
Block header = getHeader();
if (header)
assertBlocksHaveEqualStructure(res, header, getName());
}
#endif
return res;
}
void IBlockInputStream::readPrefix()
{
#ifndef NDEBUG
if (!read_prefix_is_called)
read_prefix_is_called = true;
else
throw Exception("readPrefix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
#endif
readPrefixImpl();
forEachChild([&] (IBlockInputStream & child)
{
child.readPrefix();
return false;
});
}
void IBlockInputStream::readSuffix()
{
#ifndef NDEBUG
if (!read_suffix_is_called)
read_suffix_is_called = true;
else
throw Exception("readSuffix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
#endif
forEachChild([&] (IBlockInputStream & child)
{
child.readSuffix();
return false;
});
readSuffixImpl();
}
void IBlockInputStream::updateExtremes(Block & block)
{
size_t num_columns = block.columns();
if (!extremes)
{
MutableColumns extremes_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnPtr & src = block.safeGetByPosition(i).column;
if (isColumnConst(*src))
{
/// Equal min and max.
extremes_columns[i] = src->cloneResized(2);
}
else
{
Field min_value;
Field max_value;
src->getExtremes(min_value, max_value);
extremes_columns[i] = src->cloneEmpty();
extremes_columns[i]->insert(min_value);
extremes_columns[i]->insert(max_value);
}
}
extremes = block.cloneWithColumns(std::move(extremes_columns));
}
else
{
for (size_t i = 0; i < num_columns; ++i)
{
ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column;
if (isColumnConst(*old_extremes))
continue;
Field min_value = (*old_extremes)[0];
Field max_value = (*old_extremes)[1];
Field cur_min_value;
Field cur_max_value;
block.safeGetByPosition(i).column->getExtremes(cur_min_value, cur_max_value);
if (cur_min_value < min_value)
min_value = cur_min_value;
if (cur_max_value > max_value)
max_value = cur_max_value;
MutableColumnPtr new_extremes = old_extremes->cloneEmpty();
new_extremes->insert(min_value);
new_extremes->insert(max_value);
old_extremes = std::move(new_extremes);
}
}
}
bool IBlockInputStream::checkTimeLimit() const
{
return limits.speed_limits.checkTimeLimit(info.total_stopwatch, limits.timeout_overflow_mode);
}
void IBlockInputStream::checkQuota(Block & block)
{
switch (limits.mode)
{
case LimitsMode::LIMITS_TOTAL:
/// Checked in `progress` method.
break;
case LimitsMode::LIMITS_CURRENT:
{
UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds();
quota->used({Quota::RESULT_ROWS, block.rows()}, {Quota::RESULT_BYTES, block.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed});
prev_elapsed = total_elapsed;
break;
}
}
}
void IBlockInputStream::progressImpl(const Progress & value)
{
if (progress_callback)
progress_callback(value);
if (process_list_elem)
{
if (!process_list_elem->updateProgressIn(value))
cancel(/* kill */ true);
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
ProgressValues progress = process_list_elem->getProgressIn();
size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
/** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
* NOTE: Maybe it makes sense to have them checked directly in ProcessList?
*/
if (limits.mode == LimitsMode::LIMITS_TOTAL)
{
if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read",
ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
cancel(false);
}
size_t total_rows = progress.total_rows_to_read;
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{
CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds;
}
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
if (quota && limits.mode == LimitsMode::LIMITS_TOTAL)
quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes});
}
ProfileEvents::increment(ProfileEvents::SelectedRows, value.read_rows);
ProfileEvents::increment(ProfileEvents::SelectedBytes, value.read_bytes);
}
void IBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
forEachChild([&] (IBlockInputStream & child)
{
child.cancel(kill);
return false;
});
}
bool IBlockInputStream::isCancelled() const
{
return is_cancelled;
}
bool IBlockInputStream::isCancelledOrThrowIfKilled() const
{
if (!is_cancelled)
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
void IBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
progress_callback = callback;
forEachChild([&] (IBlockInputStream & child)
{
child.setProgressCallback(callback);
return false;
});
}
void IBlockInputStream::setProcessListElement(QueryStatus * elem)
{
process_list_elem = elem;
forEachChild([&] (IBlockInputStream & child)
{
child.setProcessListElement(elem);
return false;
});
}
Block IBlockInputStream::getTotals()
{
if (totals)
return totals;
Block res;
forEachChild([&] (IBlockInputStream & child)
{
res = child.getTotals();
return bool(res);
});
return res;
}
Block IBlockInputStream::getExtremes()
{
if (extremes)
return extremes;
Block res;
forEachChild([&] (IBlockInputStream & child)
{
res = child.getExtremes();
return bool(res);
});
return res;
}
}

View File

@ -1,271 +0,0 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/ExecutionSpeedLimits.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/StreamLocalLimits.h>
#include <IO/Progress.h>
#include <Storages/TableLockHolder.h>
#include <Common/TypePromotion.h>
#include <atomic>
#include <shared_mutex>
namespace DB
{
namespace ErrorCodes
{
}
class ProcessListElement;
class EnabledQuota;
class QueryStatus;
/** The stream interface for reading data by blocks from the database.
* Relational operations are supposed to be done also as implementations of this interface.
* Watches out at how the source of the blocks works.
* Lets you get information for profiling: rows per second, blocks per second, megabytes per second, etc.
* Allows you to stop reading data (in nested sources).
*/
class IBlockInputStream : public TypePromotion<IBlockInputStream>
{
friend struct BlockStreamProfileInfo;
public:
IBlockInputStream() { info.parent = this; }
virtual ~IBlockInputStream() = default;
IBlockInputStream(const IBlockInputStream &) = delete;
IBlockInputStream & operator=(const IBlockInputStream &) = delete;
/// To output the data stream transformation tree (query execution plan).
virtual String getName() const = 0;
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* It is guaranteed that method "read" returns blocks of exactly that structure.
*/
virtual Block getHeader() const = 0;
virtual const BlockMissingValues & getMissingValues() const
{
static const BlockMissingValues none;
return none;
}
/** Read next block.
* If there are no more blocks, return an empty block (for which operator `bool` returns false).
* NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
* This also applies for readPrefix, readSuffix.
*/
Block read();
/** Read something before starting all data or after the end of all data.
* In the `readSuffix` function, you can implement a finalization that can lead to an exception.
* readPrefix() must be called before the first call to read().
* readSuffix() should be called after read() returns an empty block, or after a call to cancel(), but not during read() execution.
*/
/** The default implementation calls readPrefixImpl() on itself, and then readPrefix() recursively for all children.
* There are cases when you do not want `readPrefix` of children to be called synchronously, in this function,
* but you want them to be called, for example, in separate threads (for parallel initialization of children).
* Then overload `readPrefix` function.
*/
virtual void readPrefix();
/** The default implementation calls recursively readSuffix() on all children, and then readSuffixImpl() on itself.
* If this stream calls read() in children in a separate thread, this behavior is usually incorrect:
* readSuffix() of the child can not be called at the moment when the same child's read() is executed in another thread.
* In this case, you need to override this method so that readSuffix() in children is called, for example, after connecting streams.
*/
virtual void readSuffix();
/// Do not allow to change the table while the blocks stream and its children are alive.
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
/// Get information about execution speed.
const BlockStreamProfileInfo & getProfileInfo() const { return info; }
/** Get "total" values.
* The default implementation takes them from itself or from the first child source in which they are.
* The overridden method can perform some calculations. For example, apply an expression to the `totals` of the child source.
* There can be no total values - then an empty block is returned.
*
* Call this method only after all the data has been retrieved with `read`,
* otherwise there will be problems if any data at the same time is computed in another thread.
*/
virtual Block getTotals();
/// The same for minimums and maximums.
virtual Block getExtremes();
/** Set the execution progress bar callback.
* The callback is passed to all child sources.
* By default, it is called for leaf sources, after each block.
* (But this can be overridden in the progress() method)
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/
virtual void setProgressCallback(const ProgressCallback & callback);
/** In this method:
* - the progress callback is called;
* - the status of the query execution in ProcessList is updated;
* - checks restrictions and quotas that should be checked not within the same source,
* but over the total amount of resources spent in all sources at once (information in the ProcessList).
*/
virtual void progress(const Progress & value)
{
/// The data for progress is taken from leaf sources.
if (children.empty())
progressImpl(value);
}
void progressImpl(const Progress & value);
/** Set the pointer to the process list item.
* It is passed to all child sources.
* General information about the resources spent on the request will be written into it.
* Based on this information, the quota and some restrictions will be checked.
* This information will also be available in the SHOW PROCESSLIST request.
*/
virtual void setProcessListElement(QueryStatus * elem);
/** Set the approximate total number of rows to read.
*/
void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
/** Ask to abort the receipt of data as soon as possible.
* By default - just sets the flag is_cancelled and asks that all children be interrupted.
* This function can be called several times, including simultaneously from different threads.
* Have two modes:
* with kill = false only is_cancelled is set - streams will stop silently with returning some processed data.
* with kill = true also is_killed set - queries will stop with exception.
*/
virtual void cancel(bool kill);
bool isCancelled() const;
bool isCancelledOrThrowIfKilled() const;
/** Set limitations that checked on each block. */
virtual void setLimits(const StreamLocalLimits & limits_)
{
limits = limits_;
}
const StreamLocalLimits & getLimits() const
{
return limits;
}
/** Set the quota. If you set a quota on the amount of raw data,
* then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
*/
virtual void setQuota(const std::shared_ptr<const EnabledQuota> & new_quota)
{
quota = new_quota;
}
/// Enable calculation of minimums and maximums by the result columns.
void enableExtremes() { enabled_extremes = true; }
protected:
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
/// which child streams read are protected by the locks during the lifetime of the child streams.
std::vector<TableLockHolder> table_locks;
BlockInputStreams children;
std::shared_mutex children_mutex;
BlockStreamProfileInfo info;
std::atomic<bool> is_cancelled{false};
std::atomic<bool> is_killed{false};
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;
/// According to total_stopwatch in microseconds
UInt64 last_profile_events_update_time = 0;
/// Additional information that can be generated during the work process.
/// Total values during aggregation.
Block totals;
/// Minimums and maximums. The first row of the block - minimums, the second - the maximums.
Block extremes;
void addChild(const BlockInputStreamPtr & child)
{
std::unique_lock lock(children_mutex);
children.push_back(child);
}
/** Check limits.
* But only those that can be checked within each separate stream.
*/
bool checkTimeLimit() const;
#ifndef NDEBUG
bool read_prefix_is_called = false;
bool read_suffix_is_called = false;
#endif
private:
bool enabled_extremes = false;
/// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out.
bool limit_exceeded_need_break = false;
/// Limitations and quotas.
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota; /// If nullptr - the quota is not used.
UInt64 prev_elapsed = 0;
/// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0;
/// Derived classes must implement this function.
virtual Block readImpl() = 0;
/// Here you can do a preliminary initialization.
virtual void readPrefixImpl() {}
/// Here you need to do a finalization, which can lead to an exception.
virtual void readSuffixImpl() {}
void updateExtremes(Block & block);
/** Check quotas.
* But only those that can be checked within each separate stream.
*/
void checkQuota(Block & block);
size_t checkDepthImpl(size_t max_depth, size_t level) const;
template <typename F>
void forEachChild(F && f)
{
/// NOTE: Acquire a read lock, therefore f() should be thread safe
std::shared_lock lock(children_mutex);
// Reduce lock scope and avoid recursive locking since that is undefined for shared_mutex.
const auto children_copy = children;
lock.unlock();
for (auto & child : children_copy)
if (f(*child))
return;
}
};
}

View File

@ -1,70 +0,0 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Storages/TableLockHolder.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <string>
#include <vector>
namespace DB
{
struct Progress;
/** Interface of stream for writing data (into table, filesystem, network, terminal, etc.)
*/
class IBlockOutputStream : private boost::noncopyable
{
public:
IBlockOutputStream() = default;
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* You must pass blocks of exactly this structure to the 'write' method.
*/
virtual Block getHeader() const = 0;
/** Write block.
*/
virtual void write(const Block & block) = 0;
/** Write or do something before all data or after all data.
*/
virtual void writePrefix() {}
virtual void writeSuffix() {}
/** Flush output buffers if any.
*/
virtual void flush() {}
/** Methods to set additional information for output in formats, that support it.
*/
virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
virtual void setTotals(const Block & /*totals*/) {}
virtual void setExtremes(const Block & /*extremes*/) {}
/** Notify about progress. Method could be called from different threads.
* Passed value are delta, that must be summarized.
*/
virtual void onProgress(const Progress & /*progress*/) {}
/** Content-Type to set when sending HTTP response.
*/
virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; }
virtual ~IBlockOutputStream() = default;
/** Don't let to alter table while instance of stream is alive.
*/
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
private:
std::vector<TableLockHolder> table_locks;
};
}

View File

@ -1,17 +0,0 @@
#pragma once
#include <memory>
#include <vector>
namespace DB
{
class IBlockInputStream;
class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
using BlockOutputStreams = std::vector<BlockOutputStreamPtr>;
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <IO/WriteBuffer.h>
#include <Core/Block.h>
namespace DB

View File

@ -1,34 +0,0 @@
#pragma once
#include <DataStreams/materializeBlock.h>
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
/** Converts columns-constants to full columns ("materializes" them).
*/
class MaterializingBlockOutputStream : public IBlockOutputStream
{
public:
MaterializingBlockOutputStream(const BlockOutputStreamPtr & output_, const Block & header_)
: output{output_}, header(header_) {}
Block getHeader() const override { return header; }
void write(const Block & block) override { output->write(materializeBlock(block)); }
void flush() override { output->flush(); }
void writePrefix() override { output->writePrefix(); }
void writeSuffix() override { output->writeSuffix(); }
void setRowsBeforeLimit(size_t rows_before_limit) override { output->setRowsBeforeLimit(rows_before_limit); }
void setTotals(const Block & totals) override { output->setTotals(materializeBlock(totals)); }
void setExtremes(const Block & extremes) override { output->setExtremes(materializeBlock(extremes)); }
void onProgress(const Progress & progress) override { output->onProgress(progress); }
String getContentType() const override { return output->getContentType(); }
private:
BlockOutputStreamPtr output;
Block header;
};
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/MarkInCompressedFile.h>
#include <Common/PODArray.h>
#include <Core/Block.h>
namespace DB
{

View File

@ -1,41 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** A stream of blocks from which you can read one block.
*/
class OneBlockInputStream : public IBlockInputStream
{
public:
explicit OneBlockInputStream(Block block_) : block(std::move(block_)) { block.checkNumberOfRows(); }
String getName() const override { return "One"; }
Block getHeader() const override
{
Block res;
for (const auto & elem : block)
res.insert({ elem.column->cloneEmpty(), elem.type, elem.name });
return res;
}
protected:
Block readImpl() override
{
if (has_been_read)
return Block();
has_been_read = true;
return block;
}
private:
Block block;
bool has_been_read = false;
};
}

View File

@ -1,32 +0,0 @@
#include <DataStreams/SquashingBlockInputStream.h>
namespace DB
{
SquashingBlockInputStream::SquashingBlockInputStream(
const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory)
: header(src->getHeader()), transform(min_block_size_rows, min_block_size_bytes, reserve_memory)
{
children.emplace_back(src);
}
Block SquashingBlockInputStream::readImpl()
{
while (!all_read)
{
Block block = children[0]->read();
if (!block)
all_read = true;
auto squashed_block = transform.add(std::move(block));
if (squashed_block)
{
return squashed_block;
}
}
return {};
}
}

View File

@ -1,31 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/SquashingTransform.h>
namespace DB
{
/** Merging consecutive blocks of stream to specified minimum size.
*/
class SquashingBlockInputStream : public IBlockInputStream
{
public:
SquashingBlockInputStream(const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes,
bool reserve_memory = false);
String getName() const override { return "Squashing"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
Block header;
SquashingTransform transform;
bool all_read = false;
};
}

View File

@ -1,54 +0,0 @@
#include <DataStreams/SquashingBlockOutputStream.h>
namespace DB
{
SquashingBlockOutputStream::SquashingBlockOutputStream(BlockOutputStreamPtr dst, Block header_, size_t min_block_size_rows, size_t min_block_size_bytes)
: output(std::move(dst)), header(std::move(header_)), transform(min_block_size_rows, min_block_size_bytes)
{
}
void SquashingBlockOutputStream::write(const Block & block)
{
auto squashed_block = transform.add(block);
if (squashed_block)
output->write(squashed_block);
}
void SquashingBlockOutputStream::finalize()
{
if (all_written)
return;
all_written = true;
auto squashed_block = transform.add({});
if (squashed_block)
output->write(squashed_block);
}
void SquashingBlockOutputStream::flush()
{
if (!disable_flush)
finalize();
output->flush();
}
void SquashingBlockOutputStream::writePrefix()
{
output->writePrefix();
}
void SquashingBlockOutputStream::writeSuffix()
{
finalize();
output->writeSuffix();
}
}

View File

@ -1,39 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/SquashingTransform.h>
namespace DB
{
/** Merging consecutive blocks of stream to specified minimum size.
*/
class SquashingBlockOutputStream : public IBlockOutputStream
{
public:
SquashingBlockOutputStream(BlockOutputStreamPtr dst, Block header_, size_t min_block_size_rows, size_t min_block_size_bytes);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
void writePrefix() override;
void writeSuffix() override;
/// Don't write blocks less than specified size even when flush method was called by user.
void disableFlush() { disable_flush = true; }
private:
BlockOutputStreamPtr output;
Block header;
SquashingTransform transform;
bool all_written = false;
void finalize();
bool disable_flush = false;
};
}

View File

@ -1,8 +1,6 @@
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <DataStreams/copyData.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Compression/CompressedWriteBuffer.h>

View File

@ -4,7 +4,6 @@
#include <Processors/QueryPipelineBuilder.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/NativeReader.h>
namespace DB

View File

@ -1,86 +0,0 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Common/ThreadPool.h>
namespace DB
{
namespace
{
bool isAtomicSet(std::atomic<bool> * val)
{
return ((val != nullptr) && val->load(std::memory_order_seq_cst));
}
}
template <typename TCancelCallback, typename TProgressCallback>
void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCallback && is_cancelled, TProgressCallback && progress)
{
from.readPrefix();
to.writePrefix();
while (Block block = from.read())
{
if (is_cancelled())
break;
to.write(block);
progress(block);
}
if (is_cancelled())
return;
/// For outputting additional information in some formats.
if (from.getProfileInfo().hasAppliedLimit())
to.setRowsBeforeLimit(from.getProfileInfo().getRowsBeforeLimit());
to.setTotals(from.getTotals());
to.setExtremes(from.getExtremes());
if (is_cancelled())
return;
from.readSuffix();
to.writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred, progress);
}
inline void doNothing(const Block &) {}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred, doNothing);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{
copyDataImpl(from, to, is_cancelled, doNothing);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress)
{
copyDataImpl(from, to, is_cancelled, progress);
}
}

View File

@ -1,27 +0,0 @@
#pragma once
#include <DataStreams/IBlockStream_fwd.h>
#include <atomic>
#include <functional>
namespace DB
{
class Block;
/** Copies data from the InputStream into the OutputStream
* (for example, from the database to the console, etc.)
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress);
}

View File

@ -6,7 +6,7 @@
#include <Columns/ColumnsNumber.h>
#include <Processors/Pipe.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
using namespace DB;
@ -88,15 +88,18 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
pipe.addTransform(std::move(transform));
QueryPipeline pipeline(std::move(pipe));
pipeline.setNumThreads(1);
auto stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
PullingPipelineExecutor executor(pipeline);
size_t total_rows = 0;
auto block1 = stream->read();
auto block2 = stream->read();
auto block3 = stream->read();
Block block1;
Block block2;
Block block3;
executor.pull(block1);
executor.pull(block2);
executor.pull(block3);
EXPECT_EQ(stream->read(), Block());
Block tmp_block;
ASSERT_FALSE(executor.pull(tmp_block));
for (const auto & block : {block1, block2, block3})
total_rows += block.rows();
@ -132,14 +135,17 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
pipe.addTransform(std::move(transform));
QueryPipeline pipeline(std::move(pipe));
pipeline.setNumThreads(1);
auto stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
PullingPipelineExecutor executor(pipeline);
auto block1 = stream->read();
auto block2 = stream->read();
auto block3 = stream->read();
Block block1;
Block block2;
Block block3;
executor.pull(block1);
executor.pull(block2);
executor.pull(block3);
EXPECT_EQ(stream->read(), Block());
Block tmp_block;
ASSERT_FALSE(executor.pull(tmp_block));
EXPECT_EQ(block1.rows(), (1000 + 1500 + 1400) / 3);
EXPECT_EQ(block2.rows(), (1000 + 1500 + 1400) / 3);

View File

@ -5,7 +5,6 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/Context.h>

View File

@ -14,8 +14,6 @@
# include <Processors/Executors/CompletedPipelineExecutor.h>
# include <Processors/Sources/SourceFromSingleChunk.h>
# include <Processors/Transforms/CountingTransform.h>
# include <DataStreams/OneBlockInputStream.h>
# include <DataStreams/copyData.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Formats/MySQLSource.h>

View File

@ -1,7 +1,6 @@
#include "DictionarySourceHelpers.h"
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h>
#include "DictionaryStructure.h"

View File

@ -1,5 +1,4 @@
#include "HTTPDictionarySource.h"
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/formatBlock.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>

View File

@ -1,6 +1,5 @@
#include "LibraryDictionarySource.h"
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/Context.h>
#include <base/logger_useful.h>
#include <Common/filesystemHelpers.h>

View File

@ -3,7 +3,6 @@
#include <Columns/ColumnString.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeoutsContext.h>
@ -18,6 +17,7 @@
#include "registerDictionaries.h"
#include <Common/escapeForFileName.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Formats/IInputFormat.h>
namespace DB

View File

@ -4,11 +4,9 @@
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Core/Settings.h>
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <Formats/FormatSettings.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context_fwd.h>
#include <IO/BufferWithOwnMemory.h>

View File

@ -19,7 +19,6 @@
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
#include <Disks/SingleDiskVolume.h>

View File

@ -19,7 +19,6 @@
#include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>

View File

@ -5,7 +5,6 @@
#include <Core/NamesAndTypes.h>
#include <Core/Settings.h>
#include <Core/UUID.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DatabaseCatalog.h>

View File

@ -35,7 +35,6 @@
#include <Storages/StorageDictionary.h>
#include <Storages/StorageJoin.h>
#include <DataStreams/copyData.h>
#include <Dictionaries/DictionaryStructure.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Columns/FilterDescription.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/SubqueryForSet.h>

View File

@ -2,7 +2,6 @@
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/Context.h>

View File

@ -3,8 +3,6 @@
#include <Access/AccessFlags.h>
#include <Columns/ColumnNullable.h>
#include <Processors/Transforms/buildPushingToViewsChain.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
@ -20,7 +18,6 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/CheckConstraintsTransform.h>
#include <Processors/Transforms/CountingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>

View File

@ -1,6 +1,5 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/BlockIO.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/ASTInsertQuery.h>

View File

@ -1,4 +1,3 @@
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
@ -64,7 +63,6 @@
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
@ -158,24 +156,16 @@ InterpreterSelectQuery::InterpreterSelectQuery(
ContextPtr context_,
const SelectQueryOptions & options_,
const Names & required_result_column_names_)
: InterpreterSelectQuery(query_ptr_, context_, nullptr, std::nullopt, nullptr, options_, required_result_column_names_)
: InterpreterSelectQuery(query_ptr_, context_, std::nullopt, nullptr, options_, required_result_column_names_)
{
}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const BlockInputStreamPtr & input_,
const SelectQueryOptions & options_)
: InterpreterSelectQuery(query_ptr_, context_, input_, std::nullopt, nullptr, options_.copy().noSubquery())
{}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
Pipe input_pipe_,
const SelectQueryOptions & options_)
: InterpreterSelectQuery(query_ptr_, context_, nullptr, std::move(input_pipe_), nullptr, options_.copy().noSubquery())
: InterpreterSelectQuery(query_ptr_, context_, std::move(input_pipe_), nullptr, options_.copy().noSubquery())
{}
InterpreterSelectQuery::InterpreterSelectQuery(
@ -184,7 +174,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const StoragePtr & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const SelectQueryOptions & options_)
: InterpreterSelectQuery(query_ptr_, context_, nullptr, std::nullopt, storage_, options_.copy().noSubquery(), {}, metadata_snapshot_)
: InterpreterSelectQuery(query_ptr_, context_, std::nullopt, storage_, options_.copy().noSubquery(), {}, metadata_snapshot_)
{}
InterpreterSelectQuery::~InterpreterSelectQuery() = default;
@ -268,7 +258,6 @@ static bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const BlockInputStreamPtr & input_,
std::optional<Pipe> input_pipe_,
const StoragePtr & storage_,
const SelectQueryOptions & options_,
@ -277,7 +266,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// NOTE: the query almost always should be cloned because it will be modified during analysis.
: IInterpreterUnionOrSelectQuery(options_.modify_inplace ? query_ptr_ : query_ptr_->clone(), context_, options_)
, storage(storage_)
, input(input_)
, input_pipe(std::move(input_pipe_))
, log(&Poco::Logger::get("InterpreterSelectQuery"))
, metadata_snapshot(metadata_snapshot_)
@ -294,13 +282,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
throw Exception("Too deep subqueries. Maximum: " + settings.max_subquery_depth.toString(),
ErrorCodes::TOO_DEEP_SUBQUERIES);
bool has_input = input || input_pipe;
if (input)
{
/// Read from prepared input.
source_header = input->getHeader();
}
else if (input_pipe)
bool has_input = input_pipe != std::nullopt;
if (input_pipe)
{
/// Read from prepared input.
source_header = input_pipe->getHeader();
@ -450,17 +433,17 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (!options.only_analyze)
{
if (query.sampleSize() && (input || input_pipe || !storage || !storage->supportsSampling()))
if (query.sampleSize() && (input_pipe || !storage || !storage->supportsSampling()))
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
if (query.final() && (input || input_pipe || !storage || !storage->supportsFinal()))
if (query.final() && (input_pipe || !storage || !storage->supportsFinal()))
throw Exception(
(!input && !input_pipe && storage) ? "Storage " + storage->getName() + " doesn't support FINAL" : "Illegal FINAL",
(!input_pipe && storage) ? "Storage " + storage->getName() + " doesn't support FINAL" : "Illegal FINAL",
ErrorCodes::ILLEGAL_FINAL);
if (query.prewhere() && (input || input_pipe || !storage || !storage->supportsPrewhere()))
if (query.prewhere() && (input_pipe || !storage || !storage->supportsPrewhere()))
throw Exception(
(!input && !input_pipe && storage) ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE",
(!input_pipe && storage) ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE",
ErrorCodes::ILLEGAL_PREWHERE);
/// Save the new temporary tables in the query context
@ -578,7 +561,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
{
executeImpl(query_plan, input, std::move(input_pipe));
executeImpl(query_plan, std::move(input_pipe));
/// We must guarantee that result structure is the same as in getSampleBlock()
///
@ -926,7 +909,7 @@ static bool hasWithTotalsInAnySubqueryInFromClause(const ASTSelectQuery & query)
}
void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe)
void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<Pipe> prepared_pipe)
{
/** Streams of data. When the query is executed in parallel, we have several data streams.
* If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then
@ -1010,13 +993,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
}
else
{
if (prepared_input)
{
auto prepared_source_step
= std::make_unique<ReadFromPreparedSource>(Pipe(std::make_shared<SourceFromInputStream>(prepared_input)), context);
query_plan.addStep(std::move(prepared_source_step));
}
else if (prepared_pipe)
if (prepared_pipe)
{
auto prepared_source_step = std::make_unique<ReadFromPreparedSource>(std::move(*prepared_pipe), context);
query_plan.addStep(std::move(prepared_source_step));
@ -1580,7 +1557,7 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
{
if (!expressions.prewhere_info)
{
const bool does_storage_support_prewhere = !input && !input_pipe && storage && storage->supportsPrewhere();
const bool does_storage_support_prewhere = !input_pipe && storage && storage->supportsPrewhere();
if (does_storage_support_prewhere && shouldMoveToPrewhere())
{
/// Execute row level filter in prewhere as a part of "move to prewhere" optimization.

View File

@ -3,7 +3,6 @@
#include <memory>
#include <Core/QueryProcessingStage.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreterUnionOrSelectQuery.h>
@ -52,13 +51,6 @@ public:
const SelectQueryOptions &,
const Names & required_result_column_names_ = Names{});
/// Read data not from the table specified in the query, but from the prepared source `input`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const BlockInputStreamPtr & input_,
const SelectQueryOptions & = {});
/// Read data not from the table specified in the query, but from the prepared pipe `input`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
@ -108,7 +100,6 @@ private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const BlockInputStreamPtr & input_,
std::optional<Pipe> input_pipe,
const StoragePtr & storage_,
const SelectQueryOptions &,
@ -122,7 +113,7 @@ private:
Block getSampleBlockImpl();
void executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe);
void executeImpl(QueryPlan & query_plan, std::optional<Pipe> prepared_pipe);
/// Different stages of query execution.
@ -198,7 +189,6 @@ private:
TableLockHolder table_lock;
/// Used when we read from prepared input, not table or subquery.
BlockInputStreamPtr input;
std::optional<Pipe> input_pipe;
Poco::Logger * log;

View File

@ -3,7 +3,6 @@
#include <Parsers/formatAST.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>

View File

@ -15,7 +15,6 @@ limitations under the License. */
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/Context.h>
#include <Access/AccessFlags.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/StreamLocalLimits.h>

View File

@ -38,8 +38,6 @@ private:
/// Table from where to read data, if not subquery.
StoragePtr storage;
/// Streams of read data
BlockInputStreams streams;
};
}

View File

@ -5,7 +5,6 @@
#include <Core/Block.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB

View File

@ -15,7 +15,7 @@
#include <Processors/Sources/BlocksListSource.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
namespace DB
@ -592,9 +592,10 @@ void MergeJoin::mergeInMemoryRightBlocks()
builder.getHeader(), right_sort_description, max_rows_in_right_block, 0, 0, 0, 0, nullptr, 0));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
auto sorted_input = PipelineExecutingBlockInputStream(std::move(pipeline));
PullingPipelineExecutor executor(pipeline);
while (Block block = sorted_input.read())
Block block;
while (executor.pull(block))
{
if (!block.rows())
continue;

View File

@ -16,7 +16,6 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Parsers/ASTIdentifier.h>

View File

@ -11,7 +11,6 @@
#include <Common/Exception.h>
#include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/IBlockInputStream.h>
#include <base/logger_useful.h>
#include <chrono>

View File

@ -8,7 +8,6 @@
#include <Core/SortDescription.h>
#include <Processors/Pipe.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
namespace DB

View File

@ -7,7 +7,6 @@
#include <Interpreters/IJoin.h>
#include <Interpreters/join_common.h>
#include <Interpreters/asof.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
#include <DataTypes/getLeastSupertype.h>
#include <Storages/IStorage_fwd.h>

View File

@ -4,7 +4,6 @@
#include <Processors/Sources/SourceWithProgress.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <DataStreams/IBlockInputStream.h>
namespace zkutil

View File

@ -10,8 +10,6 @@
#include <IO/copyData.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/Transforms/CountingTransform.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>

View File

@ -1,124 +0,0 @@
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipelineBuilder.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
PipelineExecutingBlockInputStream::PipelineExecutingBlockInputStream(QueryPipeline pipeline_)
: pipeline(std::make_unique<QueryPipeline>(std::move(pipeline_)))
{
}
PipelineExecutingBlockInputStream::~PipelineExecutingBlockInputStream() = default;
Block PipelineExecutingBlockInputStream::getHeader() const
{
if (executor)
return executor->getHeader();
if (async_executor)
return async_executor->getHeader();
return pipeline->getHeader();
}
void PipelineExecutingBlockInputStream::createExecutor()
{
if (pipeline->getNumThreads() > 1)
async_executor = std::make_unique<PullingAsyncPipelineExecutor>(*pipeline);
else
executor = std::make_unique<PullingPipelineExecutor>(*pipeline);
is_execution_started = true;
}
void PipelineExecutingBlockInputStream::readPrefixImpl()
{
createExecutor();
}
Block PipelineExecutingBlockInputStream::readImpl()
{
if (!is_execution_started)
createExecutor();
Block block;
bool can_continue = true;
while (can_continue)
{
if (executor)
can_continue = executor->pull(block);
else
can_continue = async_executor->pull(block);
if (block)
return block;
}
totals = executor ? executor->getTotalsBlock()
: async_executor->getTotalsBlock();
extremes = executor ? executor->getExtremesBlock()
: async_executor->getExtremesBlock();
return {};
}
inline static void throwIfExecutionStarted(bool is_execution_started, const char * method)
{
if (is_execution_started)
throw Exception(String("Cannot call ") + method +
" for PipelineExecutingBlockInputStream because execution was started",
ErrorCodes::LOGICAL_ERROR);
}
void PipelineExecutingBlockInputStream::cancel(bool kill)
{
IBlockInputStream::cancel(kill);
if (is_execution_started)
{
executor ? executor->cancel()
: async_executor->cancel();
}
}
void PipelineExecutingBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
throwIfExecutionStarted(is_execution_started, "setProgressCallback");
pipeline->setProgressCallback(callback);
}
void PipelineExecutingBlockInputStream::setProcessListElement(QueryStatus * elem)
{
throwIfExecutionStarted(is_execution_started, "setProcessListElement");
IBlockInputStream::setProcessListElement(elem);
pipeline->setProcessListElement(elem);
}
void PipelineExecutingBlockInputStream::setLimits(const StreamLocalLimits & limits_)
{
throwIfExecutionStarted(is_execution_started, "setLimits");
if (limits_.mode == LimitsMode::LIMITS_TOTAL)
throw Exception("Total limits are not supported by PipelineExecutingBlockInputStream",
ErrorCodes::LOGICAL_ERROR);
/// Local limits may be checked by IBlockInputStream itself.
IBlockInputStream::setLimits(limits_);
}
void PipelineExecutingBlockInputStream::setQuota(const std::shared_ptr<const EnabledQuota> &)
{
throw Exception("Quota is not supported by PipelineExecutingBlockInputStream",
ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -1,44 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class QueryPipeline;
class PullingAsyncPipelineExecutor;
class PullingPipelineExecutor;
/// Implement IBlockInputStream from QueryPipeline.
/// It's a temporary wrapper.
class PipelineExecutingBlockInputStream : public IBlockInputStream
{
public:
explicit PipelineExecutingBlockInputStream(QueryPipeline pipeline_);
~PipelineExecutingBlockInputStream() override;
String getName() const override { return "PipelineExecuting"; }
Block getHeader() const override;
void cancel(bool kill) override;
/// Implement IBlockInputStream methods via QueryPipeline.
void setProgressCallback(const ProgressCallback & callback) final;
void setProcessListElement(QueryStatus * elem) final;
void setLimits(const StreamLocalLimits & limits_) final;
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final;
protected:
void readPrefixImpl() override;
Block readImpl() override;
private:
std::unique_ptr<QueryPipeline> pipeline;
/// One of executors is used.
std::unique_ptr<PullingPipelineExecutor> executor; /// for single thread.
std::unique_ptr<PullingAsyncPipelineExecutor> async_executor; /// for many threads.
bool is_execution_started = false;
void createExecutor();
};
}

View File

@ -1,67 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Formats/IInputFormat.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class InputStreamFromInputFormat : public IBlockInputStream
{
public:
explicit InputStreamFromInputFormat(InputFormatPtr input_format_)
: input_format(std::move(input_format_))
, port(input_format->getPort().getHeader(), input_format.get())
{
connect(input_format->getPort(), port);
port.setNeeded();
}
String getName() const override { return input_format->getName(); }
Block getHeader() const override { return input_format->getPort().getHeader(); }
void cancel(bool kill) override
{
input_format->cancel();
IBlockInputStream::cancel(kill);
}
const BlockMissingValues & getMissingValues() const override { return input_format->getMissingValues(); }
protected:
Block readImpl() override
{
while (true)
{
auto status = input_format->prepare();
switch (status)
{
case IProcessor::Status::Ready:
input_format->work();
break;
case IProcessor::Status::Finished:
return {};
case IProcessor::Status::PortFull:
return input_format->getPort().getHeader().cloneWithColumns(port.pull().detachColumns());
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
}
}
}
private:
InputFormatPtr input_format;
InputPort port;
};
}

View File

@ -1,43 +0,0 @@
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
namespace DB
{
Block OutputStreamToOutputFormat::getHeader() const
{
return output_format->getPort(IOutputFormat::PortKind::Main).getHeader();
}
void OutputStreamToOutputFormat::write(const Block & block)
{
output_format->write(block);
}
void OutputStreamToOutputFormat::writePrefix() { output_format->doWritePrefix(); }
void OutputStreamToOutputFormat::writeSuffix() { output_format->doWriteSuffix(); }
void OutputStreamToOutputFormat::flush() { output_format->flush(); }
void OutputStreamToOutputFormat::setRowsBeforeLimit(size_t rows_before_limit)
{
output_format->setRowsBeforeLimit(rows_before_limit);
}
void OutputStreamToOutputFormat::setTotals(const Block & totals)
{
if (totals)
output_format->setTotals(totals);
}
void OutputStreamToOutputFormat::setExtremes(const Block & extremes)
{
if (extremes)
output_format->setExtremes(extremes);
}
void OutputStreamToOutputFormat::onProgress(const Progress & progress) { output_format->onProgress(progress); }
std::string OutputStreamToOutputFormat::getContentType() const { return output_format->getContentType(); }
}

View File

@ -1,39 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
/// Wrapper. Implements IBlockOutputStream interface using IOutputFormat object.
class OutputStreamToOutputFormat : public IBlockOutputStream
{
public:
explicit OutputStreamToOutputFormat(OutputFormatPtr output_format_) : output_format(std::move(output_format_)) {}
Block getHeader() const override;
void write(const Block & block) override;
void writePrefix() override;
void writeSuffix() override;
void flush() override;
void setRowsBeforeLimit(size_t rows_before_limit) override;
void setTotals(const Block & totals) override;
void setExtremes(const Block & extremes) override;
void onProgress(const Progress & progress) override;
std::string getContentType() const override;
private:
OutputFormatPtr output_format;
};
}

View File

@ -1,6 +1,5 @@
#include <Processors/Pipe.h>
#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/LimitTransform.h>
@ -9,6 +8,7 @@
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Columns/ColumnConst.h>
@ -164,13 +164,7 @@ Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, Output
Pipe::Pipe(ProcessorPtr source)
{
if (auto * source_from_input_stream = typeid_cast<SourceFromInputStream *>(source.get()))
{
/// Special case for SourceFromInputStream. Will remove it later.
totals_port = source_from_input_stream->getTotalsPort();
extremes_port = source_from_input_stream->getExtremesPort();
}
else if (source->getOutputs().size() != 1)
if (source->getOutputs().size() != 1)
checkSource(*source);
if (collected_processors)

View File

@ -10,7 +10,6 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Transforms/CountingTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
@ -121,7 +120,6 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
std::vector<LimitTransform *> limits;
std::vector<SourceFromInputStream *> sources;
std::vector<RemoteSource *> remote_sources;
std::unordered_set<IProcessor *> visited;
@ -151,9 +149,6 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
limits.emplace_back(limit);
}
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
sources.emplace_back(source);
if (auto * source = typeid_cast<RemoteSource *>(processor))
remote_sources.emplace_back(source);
}
@ -186,16 +181,13 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
}
}
if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty() || !remote_sources.empty()))
if (!rows_before_limit_at_least && (!limits.empty() || !remote_sources.empty()))
{
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
for (auto & limit : limits)
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
for (auto & source : sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
for (auto & source : remote_sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
}

View File

@ -9,7 +9,6 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
@ -468,7 +467,6 @@ void QueryPipelineBuilder::initRowsBeforeLimit()
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
std::vector<LimitTransform *> limits;
std::vector<SourceFromInputStream *> sources;
std::vector<RemoteSource *> remote_sources;
std::unordered_set<IProcessor *> visited;
@ -498,9 +496,6 @@ void QueryPipelineBuilder::initRowsBeforeLimit()
limits.emplace_back(limit);
}
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
sources.emplace_back(source);
if (auto * source = typeid_cast<RemoteSource *>(processor))
remote_sources.emplace_back(source);
}
@ -533,16 +528,13 @@ void QueryPipelineBuilder::initRowsBeforeLimit()
}
}
if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty() || !remote_sources.empty()))
if (!rows_before_limit_at_least && (!limits.empty() || !remote_sources.empty()))
{
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
for (auto & limit : limits)
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
for (auto & source : sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
for (auto & source : remote_sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
}

View File

@ -1,6 +1,5 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/IProcessor.h>
#include <Processors/Pipe.h>

View File

@ -4,7 +4,6 @@
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Interpreters/JoinSwitcher.h>
#include <Common/JSONBuilder.h>

View File

@ -1,195 +0,0 @@
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
SourceFromInputStream::SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_)
: ISourceWithProgress(stream_->getHeader())
, force_add_aggregating_info(force_add_aggregating_info_)
, stream(std::move(stream_))
{
init();
}
void SourceFromInputStream::init()
{
const auto & sample = getPort().getHeader();
for (auto & type : sample.getDataTypes())
if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
has_aggregate_functions = true;
}
void SourceFromInputStream::addTotalsPort()
{
if (totals_port)
throw Exception("Totals port was already added for SourceFromInputStream.", ErrorCodes::LOGICAL_ERROR);
outputs.emplace_back(outputs.front().getHeader(), this);
totals_port = &outputs.back();
}
void SourceFromInputStream::addExtremesPort()
{
if (extremes_port)
throw Exception("Extremes port was already added for SourceFromInputStream.", ErrorCodes::LOGICAL_ERROR);
outputs.emplace_back(outputs.front().getHeader(), this);
extremes_port = &outputs.back();
}
IProcessor::Status SourceFromInputStream::prepare()
{
auto status = ISource::prepare();
if (status == Status::Finished)
{
is_generating_finished = true;
/// Read postfix and get totals if needed.
if (!is_stream_finished && !isCancelled())
return Status::Ready;
if (totals_port && !totals_port->isFinished())
{
if (has_totals)
{
if (!totals_port->canPush())
return Status::PortFull;
totals_port->push(std::move(totals));
has_totals = false;
}
totals_port->finish();
}
if (extremes_port && !extremes_port->isFinished())
{
if (has_extremes)
{
if (!extremes_port->canPush())
return Status::PortFull;
extremes_port->push(std::move(extremes));
has_extremes = false;
}
extremes_port->finish();
}
}
return status;
}
void SourceFromInputStream::work()
{
if (!is_generating_finished)
{
try
{
ISource::work();
}
catch (...)
{
/// Won't read suffix in case of exception.
is_stream_finished = true;
throw;
}
return;
}
if (is_stream_finished)
return;
if (rows_before_limit)
{
const auto & info = stream->getProfileInfo();
if (info.hasAppliedLimit())
rows_before_limit->add(info.getRowsBeforeLimit());
}
stream->readSuffix();
if (auto totals_block = stream->getTotals())
{
totals.setColumns(totals_block.getColumns(), 1);
has_totals = true;
}
is_stream_finished = true;
}
Chunk SourceFromInputStream::generate()
{
if (is_stream_finished)
return {};
if (!is_stream_started)
{
stream->readPrefix();
is_stream_started = true;
}
auto block = stream->read();
if (!block && !isCancelled())
{
if (rows_before_limit)
{
const auto & info = stream->getProfileInfo();
if (info.hasAppliedLimit())
rows_before_limit->add(info.getRowsBeforeLimit());
}
stream->readSuffix();
if (auto totals_block = stream->getTotals())
{
if (totals_block.rows() > 0) /// Sometimes we can get empty totals. Skip it.
{
totals.setColumns(totals_block.getColumns(), totals_block.rows());
has_totals = true;
}
}
if (auto extremes_block = stream->getExtremes())
{
if (extremes_block.rows() > 0) /// Sometimes we can get empty extremes. Skip it.
{
extremes.setColumns(extremes_block.getColumns(), extremes_block.rows());
has_extremes = true;
}
}
is_stream_finished = true;
return {};
}
if (isCancelled())
return {};
#ifndef NDEBUG
assertBlocksHaveEqualStructure(getPort().getHeader(), block, "SourceFromInputStream");
#endif
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
if (force_add_aggregating_info || has_aggregate_functions)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
chunk.setChunkInfo(std::move(info));
}
return chunk;
}
}

View File

@ -1,77 +0,0 @@
#pragma once
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
/// Wrapper for IBlockInputStream which implements ISourceWithProgress.
class SourceFromInputStream : public ISourceWithProgress
{
public:
/// If force_add_aggregating_info is enabled, AggregatedChunkInfo (with bucket number and is_overflows flag) will be added to result chunk.
explicit SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_ = false);
String getName() const override { return "SourceFromInputStream"; }
Status prepare() override;
void work() override;
Chunk generate() override;
BlockInputStreamPtr & getStream() { return stream; }
void addTotalsPort();
void addExtremesPort();
OutputPort * getTotalsPort() const { return totals_port; }
OutputPort * getExtremesPort() const { return extremes_port; }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
/// Implementation for methods from ISourceWithProgress.
void setLimits(const StreamLocalLimits & limits_) final { stream->setLimits(limits_); }
void setLeafLimits(const SizeLimits &) final { }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final { stream->setQuota(quota_); }
void setProcessListElement(QueryStatus * elem) final { stream->setProcessListElement(elem); }
void setProgressCallback(const ProgressCallback & callback) final { stream->setProgressCallback(callback); }
void addTotalRowsApprox(size_t value) final { stream->addTotalRowsApprox(value); }
/// Stop reading from stream if output port is finished.
void onUpdatePorts() override
{
if (getPort().isFinished())
cancel();
}
protected:
void onCancel() override { stream->cancel(false); }
private:
bool has_aggregate_functions = false;
bool force_add_aggregating_info = false;
BlockInputStreamPtr stream;
RowsBeforeLimitCounterPtr rows_before_limit;
Chunk totals;
OutputPort * totals_port = nullptr;
bool has_totals = false;
Chunk extremes;
OutputPort * extremes_port = nullptr;
bool has_extremes = false;
bool is_generating_finished = false;
bool is_stream_finished = false;
bool is_stream_started = false;
void init();
};
}

View File

@ -24,9 +24,6 @@ public:
Int32 bucket_num = -1;
};
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using AggregatorList = std::list<Aggregator>;
using AggregatorListPtr = std::shared_ptr<AggregatorList>;

View File

@ -1,7 +1,6 @@
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/Set.h>
#include <Interpreters/IJoin.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/SubqueryForSet.h>
#include <Processors/IAccumulatingTransform.h>

View File

@ -1,6 +1,5 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/QueryViewsLog.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/Chain.h>

View File

@ -7,7 +7,6 @@
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/copyData.h>
#include <Interpreters/Session.h>
#include <Interpreters/executeQuery.h>
#include <IO/copyData.h>

View File

@ -13,7 +13,6 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/Context_fwd.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/IBlockStream_fwd.h>
#include "IServer.h"

View File

@ -13,7 +13,6 @@
#include <IO/WriteHelpers.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/Sinks/SinkToStorage.h>
@ -23,6 +22,9 @@
#include <re2/stringpiece.h>
#include <hdfs/hdfs.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Pipe.h>
#include <filesystem>
@ -124,12 +126,13 @@ public:
auto compression = chooseCompressionMethod(path, compression_method);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri, path, getContext()->getGlobalContext()->getConfigRef()), compression);
auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size);
pipeline = QueryPipeline(std::move(input_format));
reader = std::make_shared<InputStreamFromInputFormat>(input_format);
reader->readPrefix();
reader = std::make_unique<PullingPipelineExecutor>(pipeline);
}
if (auto res = reader->read())
Block res;
if (reader->pull(res))
{
Columns columns = res.getColumns();
UInt64 num_rows = res.rows();
@ -153,15 +156,16 @@ public:
return Chunk(std::move(columns), num_rows);
}
reader->readSuffix();
reader.reset();
pipeline.reset();
read_buf.reset();
}
}
private:
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
QueryPipeline pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
SourcesInfoPtr source_info;
String uri;
String format;

View File

@ -2,7 +2,6 @@
#include <Core/Names.h>
#include <Core/QueryProcessingStage.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Databases/IDatabase.h>
#include <Interpreters/CancellationCode.h>
#include <Interpreters/Context_fwd.h>

View File

@ -1,9 +1,7 @@
#include <Storages/Kafka/KafkaSource.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <base/logger_useful.h>
#include <Interpreters/Context.h>

View File

@ -1,7 +1,6 @@
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/Kafka/parseSyslogLevel.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -16,7 +15,6 @@
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Storages/Kafka/KafkaBlockOutputStream.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/KafkaSource.h>

View File

@ -21,7 +21,6 @@ limitations under the License. */
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <DataStreams/copyData.h>
#include <base/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/SipHash.h>

View File

@ -5,7 +5,6 @@
#include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Disks/IDisk.h>

View File

@ -19,7 +19,6 @@
#include "Processors/Merges/GraphiteRollupSortedTransform.h"
#include "Processors/Merges/AggregatingSortedTransform.h"
#include "Processors/Merges/VersionedCollapsingTransform.h"
#include "Processors/Executors/PipelineExecutingBlockInputStream.h"
#include "DataStreams/TTLBlockInputStream.h"
#include <DataStreams/TTLCalcInputStream.h>
#include <Processors/Transforms/DistinctSortedTransform.h>

View File

@ -4,7 +4,6 @@
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/IBackup.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
@ -35,7 +34,6 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Storages/AlterCommands.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
@ -56,6 +54,7 @@
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Formats/IInputFormat.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <boost/range/adaptor/filtered.hpp>
@ -3664,9 +3663,12 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
buf,
metadata_snapshot->getPartitionKey().sample_block,
local_context->getSettingsRef().max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
QueryPipeline pipeline(std::move(input_format));
PullingPipelineExecutor executor(pipeline);
Block block;
executor.pull(block);
auto block = input_stream->read();
if (!block || !block.rows())
throw Exception(
"Could not parse partition value: `" + partition_ast.fields_str + "`",

View File

@ -28,7 +28,6 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/Context.h>
#include <Common/interpolate.h>

View File

@ -6,7 +6,6 @@
#include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Disks/IDisk.h>

View File

@ -13,8 +13,6 @@
#include <IO/WriteHelpers.h>
#include <Common/typeid_cast.h>
#include <DataStreams/ITTLAlgorithm.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <Parsers/queryToString.h>

View File

@ -4,9 +4,9 @@
#include <Common/escapeForFileName.h>
#include <DataStreams/TTLBlockInputStream.h>
#include <DataStreams/TTLCalcInputStream.h>
#include <DataStreams/SquashingTransform.h>
#include <Processors/Transforms/DistinctSortedTransform.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/ExpressionTransform.h>

View File

@ -2,9 +2,9 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Interpreters/PartLog.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Core/Block.h>
#include <IO/Operators.h>

View File

@ -3,8 +3,6 @@
#include "StorageMaterializedPostgreSQL.h"
#include <Columns/ColumnNullable.h>
#include <Common/hex.h>
#include <DataStreams/copyData.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>

View File

@ -7,7 +7,6 @@
#include <Core/Names.h>
#include <base/logger_useful.h>
#include <Storages/IStorage.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Parsers/ASTExpressionList.h>

View File

@ -9,7 +9,6 @@
#include <Interpreters/InterpreterRenameQuery.h>
#include <Common/setThreadName.h>
#include <Interpreters/Context.h>
#include <DataStreams/copyData.h>
#include <Databases/DatabaseOnDisk.h>

View File

@ -14,7 +14,6 @@
#include <Formats/FormatSettings.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>

View File

@ -2,7 +2,6 @@
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>

View File

@ -1,5 +1,4 @@
#include <amqpcpp.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
@ -13,7 +12,6 @@
#include <Parsers/ASTLiteral.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Storages/RabbitMQ/RabbitMQSink.h>

View File

@ -17,8 +17,8 @@
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Interpreters/Context.h>
#include <Interpreters/Set.h>

View File

@ -24,8 +24,8 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>

View File

@ -2,7 +2,6 @@
#include <Core/BackgroundSchedulePool.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage.h>
#include <base/shared_ptr_helper.h>

View File

@ -8,7 +8,6 @@
#include <Interpreters/ExternalLoaderDictionaryStorageConfigRepository.h>
#include <Parsers/ASTLiteral.h>
#include <Common/quoteString.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <IO/Operators.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>

View File

@ -59,7 +59,6 @@
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/EmptySink.h>
#include <Core/Field.h>

View File

@ -11,7 +11,6 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Pipe.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>

View File

@ -16,7 +16,6 @@
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
@ -34,8 +33,8 @@
#include <filesystem>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Pipe.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
@ -479,8 +478,6 @@ Pipe StorageFile::read(
size_t max_block_size,
unsigned num_streams)
{
BlockInputStreams blocks_input;
if (use_table_fd) /// need to call ctr BlockInputStream
paths = {""}; /// when use fd, paths are empty
else

Some files were not shown because too many files have changed in this diff Show More