Remove DataStreams folder.

This commit is contained in:
Nikolai Kochetov 2021-10-15 23:18:20 +03:00
parent ad8a344b46
commit fd14faeae2
161 changed files with 311 additions and 337 deletions

View File

@ -28,7 +28,7 @@
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/UseSSL.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Interpreters/Context.h>
#include <Client/Connection.h>
#include <Common/InterruptListener.h>
@ -432,7 +432,7 @@ private:
Progress progress;
executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
BlockStreamProfileInfo info;
ProfileInfo info;
while (Block block = executor.read())
info.update(block);

View File

@ -49,7 +49,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Formats/FormatSettings.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>

View File

@ -62,7 +62,7 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
#include <Storages/registerStorages.h>
#include <DataStreams/ConnectionCollector.h>
#include <QueryPipeline/ConnectionCollector.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Common/Config/ConfigReloader.h>

View File

@ -1,6 +1,6 @@
#include "LibraryBridgeHelper.h"
#include <DataStreams/formatBlock.h>
#include <Formats/formatBlock.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Processors/Pipe.h>
#include <Processors/Formats/IInputFormat.h>

View File

@ -49,7 +49,7 @@ add_subdirectory (Backups)
add_subdirectory (Columns)
add_subdirectory (Common)
add_subdirectory (Core)
add_subdirectory (DataStreams)
add_subdirectory (QueryPipeline)
add_subdirectory (DataTypes)
add_subdirectory (Dictionaries)
add_subdirectory (Disks)
@ -185,7 +185,7 @@ add_object_library(clickhouse_backups Backups)
add_object_library(clickhouse_core Core)
add_object_library(clickhouse_core_mysql Core/MySQL)
add_object_library(clickhouse_compression Compression)
add_object_library(clickhouse_datastreams DataStreams)
add_object_library(clickhouse_querypipeline QueryPipeline)
add_object_library(clickhouse_datatypes DataTypes)
add_object_library(clickhouse_datatypes_serializations DataTypes/Serializations)
add_object_library(clickhouse_databases Databases)

View File

@ -47,8 +47,7 @@
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/CompressionMethod.h>
#include <DataStreams/InternalTextLogs.h>
#include <DataStreams/materializeBlock.h>
#include <Client/InternalTextLogs.h>
namespace fs = std::filesystem;
@ -284,7 +283,7 @@ void ClientBase::onReceiveExceptionFromServer(std::unique_ptr<Exception> && e)
}
void ClientBase::onProfileInfo(const BlockStreamProfileInfo & profile_info)
void ClientBase::onProfileInfo(const ProfileInfo & profile_info)
{
if (profile_info.hasAppliedLimit() && output_format)
output_format->setRowsBeforeLimit(profile_info.getRowsBeforeLimit());

View File

@ -112,7 +112,7 @@ private:
void onTotals(Block & block, ASTPtr parsed_query);
void onExtremes(Block & block, ASTPtr parsed_query);
void onReceiveExceptionFromServer(std::unique_ptr<Exception> && e);
void onProfileInfo(const BlockStreamProfileInfo & profile_info);
void onProfileInfo(const ProfileInfo & profile_info);
void onEndOfStream();
void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);

View File

@ -9,8 +9,8 @@
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <IO/TimeoutSetter.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Client/Connection.h>
#include <Client/ConnectionParameters.h>
#include <Common/ClickHouseRevision.h>
@ -994,9 +994,9 @@ Progress Connection::receiveProgress() const
}
BlockStreamProfileInfo Connection::receiveProfileInfo() const
ProfileInfo Connection::receiveProfileInfo() const
{
BlockStreamProfileInfo profile_info;
ProfileInfo profile_info;
profile_info.read(*in);
return profile_info;
}

View File

@ -253,7 +253,7 @@ private:
std::vector<String> receiveMultistringMessage(UInt64 msg_type) const;
std::unique_ptr<Exception> receiveException() const;
Progress receiveProgress() const;
BlockStreamProfileInfo receiveProfileInfo() const;
ProfileInfo receiveProfileInfo() const;
void initInputBuffers();
void initBlockInput();

View File

@ -6,7 +6,7 @@
#include <Core/Block.h>
#include <Core/Protocol.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <QueryPipeline/ProfileInfo.h>
#include <Processors/Pipe.h>
#include <IO/ConnectionTimeouts.h>
@ -30,7 +30,7 @@ struct Packet
std::unique_ptr<Exception> exception;
std::vector<String> multistring_message;
Progress progress;
BlockStreamProfileInfo profile_info;
ProfileInfo profile_info;
std::vector<UUID> part_uuids;
Packet() : type(Protocol::Server::Hello) {}

View File

@ -1,4 +1,4 @@
#include "InternalTextLogs.h"
#include <Client/InternalTextLogs.h>
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Common/typeid_cast.h>

View File

@ -2,7 +2,7 @@
#include "Connection.h"
#include <Interpreters/Context.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <IO/TimeoutSetter.h>
#include <Interpreters/Session.h>

View File

@ -1,4 +1,4 @@
#include <DataStreams/SquashingTransform.h>
#include <Common/SquashingTransform.h>
#include <iostream>

View File

@ -707,4 +707,27 @@ ColumnPtr getColumnFromBlock(const Block & block, const NameAndTypePair & column
return current_column;
}
Block materializeBlock(const Block & block)
{
if (!block)
return block;
Block res = block;
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
{
auto & element = res.getByPosition(i);
element.column = element.column->convertToFullColumnIfConst();
}
return res;
}
void materializeBlockInplace(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->convertToFullColumnIfConst();
}
}

View File

@ -196,4 +196,8 @@ void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out
/// Properly handles cases, when column is a subcolumn and when it is compressed.
ColumnPtr getColumnFromBlock(const Block & block, const NameAndTypePair & column);
/// Converts columns-constants to full columns ("materializes" them).
Block materializeBlock(const Block & block);
void materializeBlockInplace(Block & block);
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Core/SettingsFields.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Formats/FormatSettings.h>

View File

@ -1,27 +0,0 @@
#include <DataStreams/finalizeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Common/typeid_cast.h>
namespace DB
{
void finalizeBlock(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & current = block.getByPosition(i);
const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());
if (unfinalized_type)
{
current.type = unfinalized_type->getReturnType();
if (current.column)
{
auto mut_column = IColumn::mutate(std::move(current.column));
current.column = ColumnAggregateFunction::convertToValues(std::move(mut_column));
}
}
}
}
}

View File

@ -1,9 +0,0 @@
#pragma once
#include <Core/Block.h>
namespace DB
{
/// Converts aggregate function columns with non-finalized states to final values
void finalizeBlock(Block & block);
}

View File

@ -1,29 +0,0 @@
#include <DataStreams/materializeBlock.h>
namespace DB
{
Block materializeBlock(const Block & block)
{
if (!block)
return block;
Block res = block;
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
{
auto & element = res.getByPosition(i);
element.column = element.column->convertToFullColumnIfConst();
}
return res;
}
void materializeBlockInplace(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->convertToFullColumnIfConst();
}
}

View File

@ -1,14 +0,0 @@
#pragma once
#include <Core/Block.h>
namespace DB
{
/** Converts columns-constants to full columns ("materializes" them).
*/
Block materializeBlock(const Block & block);
void materializeBlockInplace(Block & block);
}

View File

@ -4,7 +4,7 @@
#include <Databases/DatabaseReplicatedSettings.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/Context.h>

View File

@ -11,7 +11,7 @@
# include <DataTypes/convertMySQLDataType.h>
# include <Databases/MySQL/DatabaseMySQL.h>
# include <Databases/MySQL/FetchTablesColumnsList.h>
# include <Formats/MySQLSource.h>
# include <Processors/Sources/MySQLSource.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/QueryPipelineBuilder.h>
# include <IO/Operators.h>

View File

@ -10,7 +10,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Formats/MySQLSource.h>
#include <Processors/Sources/MySQLSource.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>

View File

@ -5,7 +5,7 @@
#include <Core/Block.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/MySQLSource.h>
#include <Processors/Sources/MySQLSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipelineBuilder.h>
#include <IO/ReadBufferFromFile.h>

View File

@ -16,7 +16,7 @@
# include <Processors/Transforms/CountingTransform.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Formats/MySQLSource.h>
# include <Processors/Sources/MySQLSource.h>
# include <IO/ReadBufferFromString.h>
# include <Interpreters/Context.h>
# include <Interpreters/executeQuery.h>

View File

@ -8,7 +8,7 @@
# include <mutex>
# include <Core/MySQL/MySQLClient.h>
# include <DataStreams/BlockIO.h>
# include <QueryPipeline/BlockIO.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypesNumber.h>
# include <Databases/DatabaseOrdinary.h>

View File

@ -2,7 +2,7 @@
#include <memory>
#include <Client/ConnectionPool.h>
#include <Processors/Sources/RemoteSource.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Transforms/ExpressionTransform.h>

View File

@ -4,8 +4,8 @@
#include <base/LocalDateTime.h>
#include <Common/ShellCommand.h>
#include <DataStreams/ShellCommandSource.h>
#include <DataStreams/formatBlock.h>
#include <Processors/Sources/ShellCommandSource.h>
#include <Formats/formatBlock.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>

View File

@ -4,7 +4,7 @@
#include <base/LocalDateTime.h>
#include <Common/ShellCommand.h>
#include <DataStreams/formatBlock.h>
#include <Formats/formatBlock.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>

View File

@ -7,7 +7,7 @@
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include <DataStreams/ShellCommandSource.h>
#include <Processors/Sources/ShellCommandSource.h>
namespace DB

View File

@ -1,5 +1,5 @@
#include "HTTPDictionarySource.h"
#include <DataStreams/formatBlock.h>
#include <Formats/formatBlock.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ReadWriteBufferFromHTTP.h>

View File

@ -12,7 +12,7 @@
# include "DictionaryStructure.h"
# include "ExternalQueryBuilder.h"
# include "IDictionarySource.h"
# include <Formats/MySQLSource.h>
# include <Processors/Sources/MySQLSource.h>
namespace Poco
{

View File

@ -8,7 +8,7 @@
#include <Common/typeid_cast.h>
#include <base/range.h>
#include <DataStreams/NativeReader.h>
#include <Formats/NativeReader.h>
#include <DataTypes/DataTypeLowCardinality.h>

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/MarkInCompressedFile.h>
#include <Formats/MarkInCompressedFile.h>
#include <Common/PODArray.h>
#include <Core/Block.h>

View File

@ -5,8 +5,8 @@
#include <IO/VarInt.h>
#include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/MarkInCompressedFile.h>
#include <DataStreams/NativeWriter.h>
#include <Formats/MarkInCompressedFile.h>
#include <Formats/NativeWriter.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeLowCardinality.h>

View File

@ -1,6 +1,6 @@
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <Formats/TemporaryFileStream.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Compression/CompressedWriteBuffer.h>
@ -41,27 +41,4 @@ void TemporaryFileStream::write(const std::string & path, const Block & header,
compressed_buf.finalize();
}
TemporaryFileLazySource::TemporaryFileLazySource(const std::string & path_, const Block & header_)
: ISource(header_)
, path(path_)
, done(false)
{}
Chunk TemporaryFileLazySource::generate()
{
if (done)
return {};
if (!stream)
stream = std::make_unique<TemporaryFileStream>(path, header);
auto block = stream->block_in->read();
if (!block)
{
done = true;
stream.reset();
}
return Chunk(block.getColumns(), block.rows());
}
}

View File

@ -4,7 +4,7 @@
#include <Processors/QueryPipelineBuilder.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <DataStreams/NativeReader.h>
#include <Formats/NativeReader.h>
namespace DB
{
@ -23,22 +23,4 @@ struct TemporaryFileStream
static void write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec);
};
class TemporaryFileLazySource : public ISource
{
public:
TemporaryFileLazySource(const std::string & path_, const Block & header_);
String getName() const override { return "TemporaryFileLazySource"; }
protected:
Chunk generate() override;
private:
const std::string path;
Block header;
bool done;
std::unique_ptr<TemporaryFileStream> stream;
};
}

View File

@ -1,5 +1,5 @@
#include <Core/Block.h>
#include <DataStreams/formatBlock.h>
#include <Formats/formatBlock.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/QueryPipeline.h>

View File

@ -1,6 +1,5 @@
#include <memory>
#include <Columns/ColumnString.h>
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h>
#include <Functions/FunctionFactory.h>

View File

@ -9,8 +9,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <DataStreams/NativeWriter.h>
#include <DataStreams/materializeBlock.h>
#include <Formats/NativeWriter.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/Aggregator.h>

View File

@ -19,7 +19,7 @@
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/SingleDiskVolume.h>

View File

@ -1,7 +1,7 @@
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Core/Settings.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>

View File

@ -2,7 +2,6 @@
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnNullable.h>
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/WriteHelpers.h>

View File

@ -23,7 +23,6 @@
#include <Storages/StorageDictionary.h>
#include <DataStreams/materializeBlock.h>
#include <Core/ColumnNumbers.h>
#include <Common/typeid_cast.h>

View File

@ -20,7 +20,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Core/Block.h>

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>

View File

@ -1,6 +1,6 @@
#include <Storages/IStorage.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <DataTypes/DataTypeString.h>
#include <Parsers/queryToString.h>
#include <Common/typeid_cast.h>

View File

@ -1,7 +1,7 @@
#include <Storages/IStorage.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/Context.h>

View File

@ -1,6 +1,6 @@
#include <Interpreters/InterpreterExplainQuery.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/InDepthNodeVisitor.h>

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/ASTInsertQuery.h>
#include <Storages/StorageInMemoryMetadata.h>

View File

@ -1,5 +1,3 @@
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Parsers/ASTFunction.h>

View File

@ -2,7 +2,7 @@
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Parsers/formatAST.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>

View File

@ -15,7 +15,7 @@ limitations under the License. */
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/Context.h>
#include <Access/AccessFlags.h>
#include <DataStreams/StreamLocalLimits.h>
#include <QueryPipeline/StreamLocalLimits.h>
namespace DB

View File

@ -12,7 +12,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage.h>

View File

@ -4,8 +4,7 @@
#include <Columns/ColumnLowCardinality.h>
#include <Core/SortCursor.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/materializeBlock.h>
#include <Formats/TemporaryFileStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/MergeJoin.h>

View File

@ -7,7 +7,7 @@
#include <Core/SortDescription.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/SortedBlocksWriter.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
namespace DB
{

View File

@ -1,7 +1,7 @@
#pragma once
#include <Core/Defines.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <IO/Progress.h>
#include <Interpreters/CancellationCode.h>
#include <Interpreters/ClientInfo.h>

View File

@ -2,7 +2,7 @@
#include <shared_mutex>
#include <Core/Block.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <DataTypes/IDataType.h>
#include <Interpreters/SetVariants.h>
#include <Parsers/IAST.h>

View File

@ -4,8 +4,8 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/materializeBlock.h>
#include <Processors/Sources/TemporaryFileLazySource.h>
#include <Formats/TemporaryFileStream.h>
#include <Disks/IVolume.h>

View File

@ -7,7 +7,7 @@
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Processors/Pipe.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
namespace DB

View File

@ -7,7 +7,7 @@
#include <Interpreters/IJoin.h>
#include <Interpreters/join_common.h>
#include <Interpreters/asof.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <DataTypes/getLeastSupertype.h>
#include <Storages/IStorage_fwd.h>
#include <Common/Exception.h>

View File

@ -4,8 +4,8 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataStreams/ShellCommandSource.h>
#include <DataStreams/formatBlock.h>
#include <Processors/Sources/ShellCommandSource.h>
#include <Formats/formatBlock.h>
namespace DB

View File

@ -3,7 +3,7 @@
#include <string>
#include <DataTypes/IDataType.h>
#include <DataStreams/ShellCommandSource.h>
#include <Processors/Sources/ShellCommandSource.h>
#include <Interpreters/IExternalLoadable.h>

View File

@ -2,8 +2,8 @@
#include <IO/WriteHelpers.h>
#include <DataStreams/ShellCommandSource.h>
#include <DataStreams/formatBlock.h>
#include <Processors/Sources/ShellCommandSource.h>
#include <Formats/formatBlock.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>

View File

@ -9,7 +9,7 @@
#include <IO/LimitReadBuffer.h>
#include <IO/copyData.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <Processors/Transforms/CountingTransform.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
@ -49,7 +49,6 @@
#include <Common/ProfileEvents.h>
#include <Common/SensitiveDataMasker.h>
#include <DataStreams/materializeBlock.h>
#include <IO/CompressionMethod.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/Context_fwd.h>
#include <Formats/FormatSettings.h>

View File

@ -4,8 +4,6 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnConst.h>
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>

View File

@ -18,9 +18,6 @@ Common/UInt128.h
Core/Block.h
Core/Defines.h
Core/Settings.h
DataStreams/PushingToViewsBlockOutputStream.cpp
DataStreams/PushingToViewsBlockOutputStream.h
DataStreams/copyData.cpp
Databases/DatabasesCommon.cpp
IO/WriteBufferValidUTF8.cpp
Interpreters/InterpreterAlterQuery.cpp

View File

@ -225,12 +225,12 @@ Block PullingAsyncPipelineExecutor::getExtremesBlock()
return header.cloneWithColumns(extremes.detachColumns());
}
BlockStreamProfileInfo & PullingAsyncPipelineExecutor::getProfileInfo()
ProfileInfo & PullingAsyncPipelineExecutor::getProfileInfo()
{
if (lazy_format)
return lazy_format->getProfileInfo();
static BlockStreamProfileInfo profile_info;
static ProfileInfo profile_info;
static std::once_flag flag;
/// Calculate rows before limit here to avoid race.
std::call_once(flag, []() { profile_info.getRowsBeforeLimit(); });

View File

@ -8,7 +8,7 @@ class QueryPipeline;
class Block;
class Chunk;
class LazyOutputFormat;
struct BlockStreamProfileInfo;
struct ProfileInfo;
/// Asynchronous pulling executor for QueryPipeline.
/// Always creates extra thread. If query is executed in single thread, use PullingPipelineExecutor.
@ -44,7 +44,7 @@ public:
Block getExtremesBlock();
/// Get query profile info.
BlockStreamProfileInfo & getProfileInfo();
ProfileInfo & getProfileInfo();
/// Internal executor data.
struct Data;

View File

@ -118,7 +118,7 @@ Block PullingPipelineExecutor::getExtremesBlock()
return header.cloneWithColumns(extremes.detachColumns());
}
BlockStreamProfileInfo & PullingPipelineExecutor::getProfileInfo()
ProfileInfo & PullingPipelineExecutor::getProfileInfo()
{
return pulling_format->getProfileInfo();
}

View File

@ -10,7 +10,7 @@ class Chunk;
class QueryPipeline;
class PipelineExecutor;
class PullingOutputFormat;
struct BlockStreamProfileInfo;
struct ProfileInfo;
using PipelineExecutorPtr = std::shared_ptr<PipelineExecutor>;
@ -46,7 +46,7 @@ public:
Block getExtremesBlock();
/// Get query profile info.
BlockStreamProfileInfo & getProfileInfo();
ProfileInfo & getProfileInfo();
private:
std::atomic_bool has_data_flag = false;

View File

@ -3,7 +3,7 @@
#include <string>
#include <Columns/IColumn.h>
#include <Processors/Formats/IInputFormat.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Poco/Timespan.h>
class Stopwatch;

View File

@ -1,5 +1,5 @@
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/Formats/IOutputFormat.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <QueryPipeline/ProfileInfo.h>
#include <IO/WriteBuffer.h>
namespace DB
@ -25,7 +25,7 @@ public:
bool isFinished() { return finished_processing && queue.size() == 0; }
BlockStreamProfileInfo & getProfileInfo() { return info; }
ProfileInfo & getProfileInfo() { return info; }
void setRowsBeforeLimit(size_t rows_before_limit) override;
@ -65,7 +65,7 @@ private:
/// Is not used.
static WriteBuffer out;
BlockStreamProfileInfo info;
ProfileInfo info;
std::atomic<bool> finished_processing;
};

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/Formats/IOutputFormat.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <QueryPipeline/ProfileInfo.h>
namespace DB
{
@ -20,7 +20,7 @@ public:
Chunk getTotals();
Chunk getExtremes();
BlockStreamProfileInfo & getProfileInfo() { return info; }
ProfileInfo & getProfileInfo() { return info; }
void setRowsBeforeLimit(size_t rows_before_limit) override;
@ -38,7 +38,7 @@ private:
std::atomic_bool & has_data_flag;
BlockStreamProfileInfo info;
ProfileInfo info;
/// Is not used.
static WriteBuffer out;

View File

@ -4,7 +4,7 @@
#include <Processors/PipelineResourcesHolder.h>
#include <Processors/Chain.h>
#include <Access/EnabledQuota.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
namespace DB
{

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/Aggregator.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Interpreters/SubqueryForSet.h>
#include <Interpreters/Context_fwd.h>

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Interpreters/Aggregator.h>
namespace DB

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
namespace DB
{

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/IVolume.h>
namespace DB

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
namespace DB
{

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/IVolume.h>
namespace DB

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
namespace DB
{

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
namespace DB
{

View File

@ -2,7 +2,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/formatAST.h>
#include <Processors/Sources/RemoteSource.h>

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
namespace DB
{

View File

@ -3,7 +3,7 @@
#include <Interpreters/Context_fwd.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Storages/TableLockHolder.h>
#include <DataStreams/StreamLocalLimits.h>
#include <QueryPipeline/StreamLocalLimits.h>
namespace DB
{

View File

@ -0,0 +1,27 @@
#pragma once
#include <Processors/Sinks/SinkToStorage.h>
#include <QueryPipeline/RemoteInserter.h>
namespace DB
{
class RemoteSink final : public RemoteInserter, public SinkToStorage
{
public:
explicit RemoteSink(
Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings & settings_,
const ClientInfo & client_info_)
: RemoteInserter(connection_, timeouts, query_, settings_, client_info_)
, SinkToStorage(RemoteInserter::getHeader())
{
}
String getName() const override { return "RemoteSink"; }
void consume (Chunk chunk) override { write(RemoteInserter::getHeader().cloneWithColumns(chunk.detachColumns())); }
void onFinish() override { RemoteInserter::onFinish(); }
};
}

View File

@ -19,7 +19,7 @@
#include <Common/assert_cast.h>
#include <base/range.h>
#include <base/logger_useful.h>
#include "MySQLSource.h"
#include <Processors/Sources/MySQLSource.h>
namespace DB

View File

@ -1,6 +1,6 @@
#include <Processors/Sources/RemoteSource.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <DataStreams/RemoteQueryExecutorReadContext.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
@ -56,7 +56,7 @@ std::optional<Chunk> RemoteSource::tryGenerate()
query_executor->setProgressCallback([this](const Progress & value) { progress(value); });
/// Get rows_before_limit result for remote query from ProfileInfo packet.
query_executor->setProfileInfoCallback([this](const BlockStreamProfileInfo & info)
query_executor->setProfileInfoCallback([this](const ProfileInfo & info)
{
if (rows_before_limit && info.hasAppliedLimit())
rows_before_limit->set(info.getRowsBeforeLimit());

Some files were not shown because too many files have changed in this diff Show More