Merge pull request #26758 from ClickHouse/output-streams-to-processors

Remove some output streams
This commit is contained in:
Nikolai Kochetov 2021-07-27 17:19:26 +03:00 committed by GitHub
commit 97bc754dd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
93 changed files with 666 additions and 481 deletions

View File

@ -43,6 +43,7 @@ SRCS(
SettingsProfile.cpp
SettingsProfileElement.cpp
SettingsProfilesCache.cpp
SettingsProfilesInfo.cpp
User.cpp
UsersConfigAccessStorage.cpp
tests/gtest_access_rights_ops.cpp

View File

@ -12,9 +12,11 @@ PEERDIR(
SRCS(
Connection.cpp
ConnectionEstablisher.cpp
ConnectionPool.cpp
ConnectionPoolWithFailover.cpp
HedgedConnections.cpp
HedgedConnectionsFactory.cpp
IConnections.cpp
MultiplexedConnections.cpp
)

View File

@ -15,6 +15,7 @@
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h>
@ -159,12 +160,11 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
auto temporary_table = TemporaryTableHolder(getContext(), ColumnsDescription{columns}, {});
auto storage = temporary_table.getTable();
getContext()->addExternalTable(data->table_name, std::move(temporary_table));
BlockOutputStreamPtr output = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), getContext());
auto sink = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), getContext());
/// Write data
data->pipe->resize(1);
auto sink = std::make_shared<SinkToOutputStream>(std::move(output));
connect(*data->pipe->getOutputPort(0), sink->getPort());
auto processors = Pipe::detachProcessors(std::move(*data->pipe));

View File

@ -0,0 +1,114 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class PushingToSinkBlockOutputStream : public IBlockOutputStream
{
public:
explicit PushingToSinkBlockOutputStream(SinkToStoragePtr sink_)
: sink(std::move(sink_)), port(sink->getPort().getHeader(), sink.get()) {}
Block getHeader() const override { return sink->getPort().getHeader(); }
void write(const Block & block) override
{
/// In case writePrefix was not called.
if (!port.isConnected())
writePrefix();
if (!block)
return;
size_t num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
port.push(std::move(chunk));
while (true)
{
auto status = sink->prepare();
switch (status)
{
case IProcessor::Status::Ready:
sink->work();
continue;
case IProcessor::Status::NeedData:
return;
case IProcessor::Status::Async: [[fallthrough]];
case IProcessor::Status::ExpandPipeline: [[fallthrough]];
case IProcessor::Status::Finished: [[fallthrough]];
case IProcessor::Status::PortFull:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Status {} in not expected in PushingToSinkBlockOutputStream::writePrefix",
IProcessor::statusToName(status));
}
}
}
void writePrefix() override
{
connect(port, sink->getPort());
while (true)
{
auto status = sink->prepare();
switch (status)
{
case IProcessor::Status::Ready:
sink->work();
continue;
case IProcessor::Status::NeedData:
return;
case IProcessor::Status::Async: [[fallthrough]];
case IProcessor::Status::ExpandPipeline: [[fallthrough]];
case IProcessor::Status::Finished: [[fallthrough]];
case IProcessor::Status::PortFull:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Status {} in not expected in PushingToSinkBlockOutputStream::writePrefix",
IProcessor::statusToName(status));
}
}
}
void writeSuffix() override
{
port.finish();
while (true)
{
auto status = sink->prepare();
switch (status)
{
case IProcessor::Status::Ready:
sink->work();
continue;
case IProcessor::Status::Finished:
///flush();
return;
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::ExpandPipeline:
case IProcessor::Status::PortFull:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Status {} in not expected in PushingToSinkBlockOutputStream::writeSuffix",
IProcessor::statusToName(status));
}
}
}
private:
SinkToStoragePtr sink;
OutputPort port;
};
}

View File

@ -13,12 +13,12 @@
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <Common/checkStackSize.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/StorageValues.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Storages/StorageMaterializedView.h>
#include <common/logger_useful.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
namespace DB
{
@ -127,8 +127,12 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
/// Do not push to destination table if the flag is set
if (!no_destination)
{
output = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), getContext());
replicated_output = dynamic_cast<ReplicatedMergeTreeBlockOutputStream *>(output.get());
auto sink = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), getContext());
metadata_snapshot->check(sink->getPort().getHeader().getColumnsWithTypeAndName());
replicated_output = dynamic_cast<ReplicatedMergeTreeSink *>(sink.get());
output = std::make_shared<PushingToSinkBlockOutputStream>(std::move(sink));
}
}

View File

@ -13,7 +13,7 @@ class Logger;
namespace DB
{
class ReplicatedMergeTreeBlockOutputStream;
class ReplicatedMergeTreeSink;
/** Writes data to the specified table and to all dependent materialized views.
*/
@ -38,7 +38,7 @@ private:
StoragePtr storage;
StorageMetadataPtr metadata_snapshot;
BlockOutputStreamPtr output;
ReplicatedMergeTreeBlockOutputStream * replicated_output = nullptr;
ReplicatedMergeTreeSink * replicated_output = nullptr;
Poco::Logger * log;
ASTPtr query_ptr;

View File

@ -19,6 +19,7 @@ SRCS(
BlockStreamProfileInfo.cpp
CheckConstraintsBlockOutputStream.cpp
ColumnGathererStream.cpp
ConnectionCollector.cpp
ConvertingBlockInputStream.cpp
CountingBlockOutputStream.cpp
DistinctSortedBlockInputStream.cpp

View File

@ -232,6 +232,8 @@ SRCS(
countSubstringsCaseInsensitive.cpp
countSubstringsCaseInsensitiveUTF8.cpp
currentDatabase.cpp
currentProfiles.cpp
currentRoles.cpp
currentUser.cpp
dateDiff.cpp
dateName.cpp
@ -319,6 +321,7 @@ SRCS(
ilike.cpp
in.cpp
indexHint.cpp
initialQueryID.cpp
initializeAggregation.cpp
intDiv.cpp
intDivOrZero.cpp
@ -412,6 +415,7 @@ SRCS(
positionCaseInsensitiveUTF8.cpp
positionUTF8.cpp
pow.cpp
queryID.cpp
rand.cpp
rand64.cpp
randConstant.cpp

View File

@ -43,6 +43,7 @@ SRCS(
MySQLPacketPayloadReadBuffer.cpp
MySQLPacketPayloadWriteBuffer.cpp
NullWriteBuffer.cpp
OpenedFile.cpp
PeekableReadBuffer.cpp
Progress.cpp
ReadBufferFromEncryptedFile.cpp

View File

@ -15,7 +15,8 @@
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/IAST.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Common/typeid_cast.h>
namespace DB
@ -150,14 +151,13 @@ public:
auto external_table = external_storage_holder->getTable();
auto table_out = external_table->write({}, external_table->getInMemoryMetadataPtr(), getContext());
auto io = interpreter->execute();
PullingPipelineExecutor executor(io.pipeline);
table_out->writePrefix();
Block block;
while (executor.pull(block))
table_out->write(block);
table_out->writeSuffix();
io.pipeline.resize(1);
io.pipeline.setSinks([&](const Block &, Pipe::StreamType) -> ProcessorPtr
{
return table_out;
});
auto executor = io.pipeline.execute();
executor->execute(io.pipeline.getNumStreams());
}
else
{

View File

@ -8,6 +8,7 @@
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
@ -271,7 +272,7 @@ BlockIO InterpreterInsertQuery::execute()
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)
out = table->write(query_ptr, metadata_snapshot, getContext());
out = std::make_shared<PushingToSinkBlockOutputStream>(table->write(query_ptr, metadata_snapshot, getContext()));
else
out = std::make_shared<PushingToViewsBlockOutputStream>(table, metadata_snapshot, getContext(), query_ptr, no_destination);

View File

@ -14,7 +14,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/EmptySink.h>
#include <filesystem>

View File

@ -391,6 +391,9 @@ void PipelineExecutor::finish()
void PipelineExecutor::execute(size_t num_threads)
{
if (num_threads < 1)
num_threads = 1;
try
{
executeImpl(num_threads);

View File

@ -11,12 +11,17 @@ ISink::ISink(Block header)
ISink::Status ISink::prepare()
{
if (!was_on_start_called)
return Status::Ready;
if (has_input)
return Status::Ready;
if (input.isFinished())
{
onFinish();
if (!was_on_finish_called)
return Status::Ready;
return Status::Finished;
}
@ -31,9 +36,21 @@ ISink::Status ISink::prepare()
void ISink::work()
{
consume(std::move(current_chunk));
has_input = false;
if (!was_on_start_called)
{
was_on_start_called = true;
onStart();
}
else if (has_input)
{
has_input = false;
consume(std::move(current_chunk));
}
else if (!was_on_finish_called)
{
was_on_finish_called = true;
onFinish();
}
}
}

View File

@ -12,9 +12,11 @@ protected:
InputPort & input;
Chunk current_chunk;
bool has_input = false;
bool was_on_start_called = false;
bool was_on_finish_called = false;
virtual void consume(Chunk block) = 0;
virtual void onStart() {}
virtual void onFinish() {}
public:

View File

@ -4,7 +4,8 @@
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/NullSource.h>

View File

@ -15,7 +15,7 @@
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/VirtualColumnUtils.h>
@ -157,7 +157,7 @@ Pipe ReadFromMergeTree::readFromPool(
for (size_t i = 0; i < max_streams; ++i)
{
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
auto source = std::make_shared<MergeTreeThreadSelectProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
@ -662,7 +662,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part and if num_streams > 1 we
/// can use parallel select on such parts. We save such parts in one vector and then use
/// MergeTreeReadPool and MergeTreeThreadSelectBlockInputProcessor for parallel select.
/// MergeTreeReadPool and MergeTreeThreadSelectProcessor for parallel select.
if (num_streams > 1 && settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0)

View File

@ -0,0 +1,18 @@
#pragma once
#include <Processors/ISink.h>
namespace DB
{
/// Sink which reads everything and do nothing with it.
class EmptySink : public ISink
{
public:
explicit EmptySink(Block header) : ISink(std::move(header)) {}
String getName() const override { return "EmptySink"; }
protected:
void consume(Chunk) override {}
};
}

View File

@ -1,5 +1,4 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/ISink.h>
namespace DB
@ -21,15 +20,4 @@ protected:
void consume(Chunk) override {}
};
/// Sink which reads everything and do nothing with it.
class EmptySink : public ISink
{
public:
explicit EmptySink(Block header) : ISink(std::move(header)) {}
String getName() const override { return "EmptySink"; }
protected:
void consume(Chunk) override {}
};
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Processors/ISink.h>
#include <Storages/TableLockHolder.h>
namespace DB
{
/// Sink which is returned from Storage::read.
/// The same as ISink, but also can hold table lock.
class SinkToStorage : public ISink
{
public:
using ISink::ISink;
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
private:
std::vector<TableLockHolder> table_locks;
};
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
class NullSinkToStorage : public SinkToStorage
{
public:
using SinkToStorage::SinkToStorage;
std::string getName() const override { return "NullSinkToStorage"; }
void consume(Chunk) override {}
};
}

View File

@ -1,6 +1,6 @@
#include <Processors/Sources/DelayedSource.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/ResizeProcessor.h>
namespace DB

View File

@ -1,4 +1,5 @@
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <DataStreams/IBlockOutputStream.h>
@ -49,7 +50,7 @@ void CreatingSetsTransform::startSubquery()
LOG_TRACE(log, "Filling temporary table.");
if (subquery.table)
table_out = subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext());
table_out = std::make_shared<PushingToSinkBlockOutputStream>(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext()));
done_with_set = !subquery.set;
done_with_table = !subquery.table;

View File

@ -1,7 +1,7 @@
#include <gtest/gtest.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Columns/ColumnsNumber.h>

View File

@ -7,6 +7,7 @@
#include <Common/SettingsChanges.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/InternalTextLogsQueue.h>
@ -943,7 +944,7 @@ namespace
{
/// The data will be written directly to the table.
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto out_stream = storage->write(ASTPtr(), metadata_snapshot, query_context);
auto out_stream = std::make_shared<PushingToSinkBlockOutputStream>(storage->write(ASTPtr(), metadata_snapshot, query_context));
ReadBufferFromMemory data(external_table.data().data(), external_table.data().size());
String format = external_table.format();
if (format.empty())

View File

@ -19,6 +19,7 @@
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/InternalTextLogsQueue.h>
@ -1330,7 +1331,7 @@ bool TCPHandler::receiveData(bool scalar)
}
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
/// The data will be written directly to the table.
auto temporary_table_out = storage->write(ASTPtr(), metadata_snapshot, query_context);
auto temporary_table_out = std::make_shared<PushingToSinkBlockOutputStream>(storage->write(ASTPtr(), metadata_snapshot, query_context));
temporary_table_out->write(block);
temporary_table_out->writeSuffix();

View File

@ -1,4 +1,4 @@
#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DistributedSink.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/StorageDistributed.h>
#include <Disks/StoragePolicy.h>
@ -86,7 +86,7 @@ static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & bl
}
DistributedBlockOutputStream::DistributedBlockOutputStream(
DistributedSink::DistributedSink(
ContextPtr context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -95,7 +95,8 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
bool insert_sync_,
UInt64 insert_timeout_,
StorageID main_table_)
: context(Context::createCopy(context_))
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, context(Context::createCopy(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, query_ast(query_ast_)
@ -115,24 +116,15 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
}
Block DistributedBlockOutputStream::getHeader() const
void DistributedSink::consume(Chunk chunk)
{
if (!allow_materialized)
return metadata_snapshot->getSampleBlockNonMaterialized();
else
return metadata_snapshot->getSampleBlock();
}
if (is_first_chunk)
{
storage.delayInsertOrThrowIfNeeded();
is_first_chunk = false;
}
void DistributedBlockOutputStream::writePrefix()
{
storage.delayInsertOrThrowIfNeeded();
}
void DistributedBlockOutputStream::write(const Block & block)
{
Block ordinary_block{ block };
auto ordinary_block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (!allow_materialized)
{
@ -155,7 +147,7 @@ void DistributedBlockOutputStream::write(const Block & block)
writeAsync(ordinary_block);
}
void DistributedBlockOutputStream::writeAsync(const Block & block)
void DistributedSink::writeAsync(const Block & block)
{
if (random_shard_insert)
{
@ -174,7 +166,7 @@ void DistributedBlockOutputStream::writeAsync(const Block & block)
}
std::string DistributedBlockOutputStream::getCurrentStateDescription()
std::string DistributedSink::getCurrentStateDescription()
{
WriteBufferFromOwnString buffer;
const auto & addresses = cluster->getShardsAddresses();
@ -203,7 +195,7 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
}
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end)
void DistributedSink::initWritingJobs(const Block & first_block, size_t start, size_t end)
{
const Settings & settings = context->getSettingsRef();
const auto & addresses_with_failovers = cluster->getShardsAddresses();
@ -249,7 +241,7 @@ void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, si
}
void DistributedBlockOutputStream::waitForJobs()
void DistributedSink::waitForJobs()
{
pool->wait();
@ -279,7 +271,7 @@ void DistributedBlockOutputStream::waitForJobs()
ThreadPool::Job
DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards)
DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, size_t num_shards)
{
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
@ -403,7 +395,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
}
void DistributedBlockOutputStream::writeSync(const Block & block)
void DistributedSink::writeSync(const Block & block)
{
const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
@ -487,7 +479,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
}
void DistributedBlockOutputStream::writeSuffix()
void DistributedSink::onFinish()
{
auto log_performance = [this]()
{
@ -537,7 +529,7 @@ void DistributedBlockOutputStream::writeSuffix()
}
IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & source_block) const
IColumn::Selector DistributedSink::createSelector(const Block & source_block) const
{
Block current_block_with_sharding_key_expr = source_block;
storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr);
@ -548,7 +540,7 @@ IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & sou
}
Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
Blocks DistributedSink::splitBlock(const Block & block)
{
auto selector = createSelector(block);
@ -572,7 +564,7 @@ Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
}
void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
void DistributedSink::writeSplitAsync(const Block & block)
{
Blocks splitted_blocks = splitBlock(block);
const size_t num_shards = splitted_blocks.size();
@ -585,7 +577,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
}
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t shard_id)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
@ -621,7 +613,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t sh
}
void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repeats)
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
{
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
@ -633,7 +625,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repe
}
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();

View File

@ -1,7 +1,7 @@
#pragma once
#include <Parsers/formatAST.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Core/Block.h>
#include <Common/PODArray.h>
@ -34,10 +34,10 @@ class StorageDistributed;
* and the resulting blocks are written in a compressed Native format in separate directories for sending.
* For each destination address (each directory with data to send), a separate thread is created in StorageDistributed,
* which monitors the directory and sends data. */
class DistributedBlockOutputStream : public IBlockOutputStream
class DistributedSink : public SinkToStorage
{
public:
DistributedBlockOutputStream(
DistributedSink(
ContextPtr context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -47,11 +47,9 @@ public:
UInt64 insert_timeout_,
StorageID main_table_);
Block getHeader() const override;
void write(const Block & block) override;
void writePrefix() override;
void writeSuffix() override;
String getName() const override { return "DistributedSink"; }
void consume(Chunk chunk) override;
void onFinish() override;
private:
IColumn::Selector createSelector(const Block & source_block) const;
@ -77,7 +75,7 @@ private:
void initWritingJobs(const Block & first_block, size_t start, size_t end);
struct JobReplica;
ThreadPool::Job runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards);
ThreadPool::Job runWritingJob(JobReplica & job, const Block & current_block, size_t num_shards);
void waitForJobs();
@ -97,6 +95,8 @@ private:
bool random_shard_insert;
bool allow_materialized;
bool is_first_chunk = true;
/// Sync-related stuff
UInt64 insert_timeout; // in seconds
StorageID main_table;

View File

@ -15,7 +15,7 @@
#include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Common/parseGlobs.h>
#include <Poco/URI.h>
@ -172,36 +172,33 @@ private:
Block sample_block;
};
class HDFSBlockOutputStream : public IBlockOutputStream
class HDFSSink : public SinkToStorage
{
public:
HDFSBlockOutputStream(const String & uri,
HDFSSink(const String & uri,
const String & format,
const Block & sample_block_,
const Block & sample_block,
ContextPtr context,
const CompressionMethod compression_method)
: sample_block(sample_block_)
: SinkToStorage(sample_block)
{
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context->getGlobalContext()->getConfigRef()), compression_method, 3);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context);
}
Block getHeader() const override
String getName() const override { return "HDFSSink"; }
void consume(Chunk chunk) override
{
return sample_block;
if (is_first_chunk)
{
writer->writePrefix();
is_first_chunk = false;
}
writer->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void write(const Block & block) override
{
writer->write(block);
}
void writePrefix() override
{
writer->writePrefix();
}
void writeSuffix() override
void onFinish() override
{
try
{
@ -218,9 +215,9 @@ public:
}
private:
Block sample_block;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
bool is_first_chunk = true;
};
/* Recursive directory listing with matched paths as a result.
@ -314,9 +311,9 @@ Pipe StorageHDFS::read(
return Pipe::unitePipes(std::move(pipes));
}
BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageHDFS::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
return std::make_shared<HDFSBlockOutputStream>(uri,
return std::make_shared<HDFSSink>(uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),

View File

@ -32,7 +32,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void truncate(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, TableExclusiveLockHolder &) override;

View File

@ -51,6 +51,9 @@ class Pipe;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
class SinkToStorage;
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
class QueryPipeline;
using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
@ -326,7 +329,7 @@ public:
* changed during lifetime of the returned streams, but the snapshot is
* guaranteed to be immutable.
*/
virtual BlockOutputStreamPtr write(
virtual SinkToStoragePtr write(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*context*/)

View File

@ -6,30 +6,26 @@
namespace DB
{
KafkaBlockOutputStream::KafkaBlockOutputStream(
KafkaSink::KafkaSink(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const ContextPtr & context_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlockNonMaterialized())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
{
}
Block KafkaBlockOutputStream::getHeader() const
void KafkaSink::onStart()
{
return metadata_snapshot->getSampleBlockNonMaterialized();
}
void KafkaBlockOutputStream::writePrefix()
{
buffer = storage.createWriteBuffer(getHeader());
buffer = storage.createWriteBuffer(getPort().getHeader());
auto format_settings = getFormatSettings(context);
format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
getHeader(), context,
getPort().getHeader(), context,
[this](const Columns & columns, size_t row)
{
buffer->countRow(columns, row);
@ -37,20 +33,17 @@ void KafkaBlockOutputStream::writePrefix()
format_settings);
}
void KafkaBlockOutputStream::write(const Block & block)
void KafkaSink::consume(Chunk chunk)
{
child->write(block);
child->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void KafkaBlockOutputStream::writeSuffix()
void KafkaSink::onFinish()
{
if (child)
child->writeSuffix();
flush();
}
//flush();
void KafkaBlockOutputStream::flush()
{
if (buffer)
buffer->flush();
}

View File

@ -1,26 +1,25 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/Kafka/StorageKafka.h>
namespace DB
{
class KafkaBlockOutputStream : public IBlockOutputStream
class KafkaSink : public SinkToStorage
{
public:
explicit KafkaBlockOutputStream(
explicit KafkaSink(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<const Context> & context_);
Block getHeader() const override;
void consume(Chunk chunk) override;
void onStart() override;
void onFinish() override;
String getName() const override { return "KafkaSink"; }
void writePrefix() override;
void write(const Block & block) override;
void writeSuffix() override;
void flush() override;
///void flush() override;
private:
StorageKafka & storage;

View File

@ -289,14 +289,14 @@ Pipe StorageKafka::read(
}
BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
auto modified_context = Context::createCopy(local_context);
modified_context->applySettingsChanges(settings_adjustments);
if (topics.size() > 1)
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<KafkaBlockOutputStream>(*this, metadata_snapshot, modified_context);
return std::make_shared<KafkaSink>(*this, metadata_snapshot, modified_context);
}

View File

@ -50,7 +50,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr context) override;

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>

View File

@ -1,5 +1,5 @@
#pragma once
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/StorageMergeTree.h>
#include <Interpreters/PartLog.h>
@ -7,13 +7,7 @@
namespace DB
{
Block MergeTreeBlockOutputStream::getHeader() const
{
return metadata_snapshot->getSampleBlock();
}
void MergeTreeBlockOutputStream::writePrefix()
void MergeTreeSink::onStart()
{
/// Only check "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
@ -21,8 +15,10 @@ void MergeTreeBlockOutputStream::writePrefix()
}
void MergeTreeBlockOutputStream::write(const Block & block)
void MergeTreeSink::consume(Chunk chunk)
{
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
for (auto & current_block : part_blocks)
{

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
@ -11,24 +11,25 @@ class Block;
class StorageMergeTree;
class MergeTreeBlockOutputStream : public IBlockOutputStream
class MergeTreeSink : public SinkToStorage
{
public:
MergeTreeBlockOutputStream(
MergeTreeSink(
StorageMergeTree & storage_,
const StorageMetadataPtr metadata_snapshot_,
size_t max_parts_per_block_,
ContextPtr context_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
, context(context_)
{
}
Block getHeader() const override;
void write(const Block & block) override;
void writePrefix() override;
String getName() const override { return "MergeTreeSink"; }
void consume(Chunk chunk) override;
void onStart() override;
private:
StorageMergeTree & storage;

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Interpreters/Context.h>
@ -8,7 +8,7 @@ namespace DB
{
MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcessor(
MergeTreeThreadSelectProcessor::MergeTreeThreadSelectProcessor(
const size_t thread_,
const MergeTreeReadPoolPtr & pool_,
const size_t min_marks_to_read_,
@ -46,7 +46,7 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
}
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
bool MergeTreeThreadSelectProcessor::getNewTask()
{
task = pool->getTask(min_marks_to_read, thread, ordered_names);
@ -107,6 +107,6 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
}
MergeTreeThreadSelectBlockInputProcessor::~MergeTreeThreadSelectBlockInputProcessor() = default;
MergeTreeThreadSelectProcessor::~MergeTreeThreadSelectProcessor() = default;
}

View File

@ -11,10 +11,10 @@ class MergeTreeReadPool;
/** Used in conjunction with MergeTreeReadPool, asking it for more work to do and performing whatever reads it is asked
* to perform.
*/
class MergeTreeThreadSelectBlockInputProcessor : public MergeTreeBaseSelectProcessor
class MergeTreeThreadSelectProcessor : public MergeTreeBaseSelectProcessor
{
public:
MergeTreeThreadSelectBlockInputProcessor(
MergeTreeThreadSelectProcessor(
const size_t thread_,
const std::shared_ptr<MergeTreeReadPool> & pool_,
const size_t min_marks_to_read_,
@ -32,7 +32,7 @@ public:
String getName() const override { return "MergeTreeThread"; }
~MergeTreeThreadSelectBlockInputProcessor() override;
~MergeTreeThreadSelectProcessor() override;
protected:
/// Requests read task from MergeTreeReadPool and signals whether it got one

View File

@ -1,6 +1,6 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Interpreters/PartLog.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/SipHash.h>
@ -33,7 +33,7 @@ namespace ErrorCodes
}
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
ReplicatedMergeTreeSink::ReplicatedMergeTreeSink(
StorageReplicatedMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t quorum_,
@ -43,7 +43,8 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
bool deduplicate_,
ContextPtr context_,
bool is_attach_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, quorum(quorum_)
, quorum_timeout_ms(quorum_timeout_ms_)
@ -60,12 +61,6 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
}
Block ReplicatedMergeTreeBlockOutputStream::getHeader() const
{
return metadata_snapshot->getSampleBlock();
}
/// Allow to verify that the session in ZooKeeper is still alive.
static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
{
@ -77,7 +72,7 @@ static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
}
void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper)
void ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper)
{
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
@ -121,8 +116,10 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe
}
void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
void ReplicatedMergeTreeSink::consume(Chunk chunk)
{
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
last_block_is_duplicate = false;
auto zookeeper = storage.getZooKeeper();
@ -183,7 +180,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
}
void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
{
last_block_is_duplicate = false;
@ -210,7 +207,7 @@ void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::Muta
}
void ReplicatedMergeTreeBlockOutputStream::commitPart(
void ReplicatedMergeTreeSink::commitPart(
zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id)
{
metadata_snapshot->check(part->getColumns());
@ -507,7 +504,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
}
}
void ReplicatedMergeTreeBlockOutputStream::writePrefix()
void ReplicatedMergeTreeSink::onStart()
{
/// Only check "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
@ -515,7 +512,7 @@ void ReplicatedMergeTreeBlockOutputStream::writePrefix()
}
void ReplicatedMergeTreeBlockOutputStream::waitForQuorum(
void ReplicatedMergeTreeSink::waitForQuorum(
zkutil::ZooKeeperPtr & zookeeper,
const std::string & part_name,
const std::string & quorum_path,

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <common/types.h>
@ -19,10 +19,10 @@ namespace DB
class StorageReplicatedMergeTree;
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
class ReplicatedMergeTreeSink : public SinkToStorage
{
public:
ReplicatedMergeTreeBlockOutputStream(
ReplicatedMergeTreeSink(
StorageReplicatedMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t quorum_,
@ -35,9 +35,10 @@ public:
// needed to set the special LogEntryType::ATTACH_PART
bool is_attach_ = false);
Block getHeader() const override;
void writePrefix() override;
void write(const Block & block) override;
void onStart() override;
void consume(Chunk chunk) override;
String getName() const override { return "ReplicatedMergeTreeSink"; }
/// For ATTACHing existing data on filesystem.
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);

View File

@ -1,29 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
namespace DB
{
class RabbitMQBlockOutputStream : public IBlockOutputStream
{
public:
explicit RabbitMQBlockOutputStream(StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, ContextPtr context_);
Block getHeader() const override;
void writePrefix() override;
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
ContextPtr context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
};
}

View File

@ -1,4 +1,4 @@
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
#include <Storages/RabbitMQ/RabbitMQSink.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <Formats/FormatFactory.h>
@ -8,24 +8,19 @@
namespace DB
{
RabbitMQBlockOutputStream::RabbitMQBlockOutputStream(
RabbitMQSink::RabbitMQSink(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
ContextPtr context_)
: storage(storage_)
: SinkToStorage(metadata_snapshot->getSampleBlockNonMaterialized())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
{
}
Block RabbitMQBlockOutputStream::getHeader() const
{
return metadata_snapshot->getSampleBlockNonMaterialized();
}
void RabbitMQBlockOutputStream::writePrefix()
void RabbitMQSink::onStart()
{
if (!storage.exchangeRemoved())
storage.unbindExchange();
@ -37,7 +32,7 @@ void RabbitMQBlockOutputStream::writePrefix()
format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
getHeader(), context,
getPort().getHeader(), context,
[this](const Columns & /* columns */, size_t /* rows */)
{
buffer->countRow();
@ -46,13 +41,13 @@ void RabbitMQBlockOutputStream::writePrefix()
}
void RabbitMQBlockOutputStream::write(const Block & block)
void RabbitMQSink::consume(Chunk chunk)
{
child->write(block);
child->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void RabbitMQBlockOutputStream::writeSuffix()
void RabbitMQSink::onFinish()
{
child->writeSuffix();

View File

@ -0,0 +1,29 @@
#pragma once
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
namespace DB
{
class RabbitMQSink : public SinkToStorage
{
public:
explicit RabbitMQSink(StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, ContextPtr context_);
void onStart() override;
void consume(Chunk chunk) override;
void onFinish() override;
String getName() const override { return "RabbitMQSink"; }
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
ContextPtr context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
};
}

View File

@ -14,7 +14,7 @@
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
#include <Storages/RabbitMQ/RabbitMQSink.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Storages/StorageFactory.h>
@ -644,9 +644,9 @@ Pipe StorageRabbitMQ::read(
}
BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
return std::make_shared<RabbitMQBlockOutputStream>(*this, metadata_snapshot, local_context);
return std::make_shared<RabbitMQSink>(*this, metadata_snapshot, local_context);
}

View File

@ -50,7 +50,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context) override;

View File

@ -1,4 +1,4 @@
#include <Storages/RocksDB/EmbeddedRocksDBBlockOutputStream.h>
#include <Storages/RocksDB/EmbeddedRocksDBSink.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <IO/WriteBufferFromString.h>
@ -13,14 +13,14 @@ namespace ErrorCodes
extern const int ROCKSDB_ERROR;
}
EmbeddedRocksDBBlockOutputStream::EmbeddedRocksDBBlockOutputStream(
EmbeddedRocksDBSink::EmbeddedRocksDBSink(
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{
Block sample_block = metadata_snapshot->getSampleBlock();
for (const auto & elem : sample_block)
for (const auto & elem : getPort().getHeader())
{
if (elem.name == storage.primary_key)
break;
@ -28,15 +28,10 @@ EmbeddedRocksDBBlockOutputStream::EmbeddedRocksDBBlockOutputStream(
}
}
Block EmbeddedRocksDBBlockOutputStream::getHeader() const
void EmbeddedRocksDBSink::consume(Chunk chunk)
{
return metadata_snapshot->getSampleBlock();
}
void EmbeddedRocksDBBlockOutputStream::write(const Block & block)
{
metadata_snapshot->check(block, true);
auto rows = block.rows();
auto rows = chunk.getNumRows();
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
WriteBufferFromOwnString wb_key;
WriteBufferFromOwnString wb_value;

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
@ -10,15 +10,15 @@ class StorageEmbeddedRocksDB;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
class EmbeddedRocksDBBlockOutputStream : public IBlockOutputStream
class EmbeddedRocksDBSink : public SinkToStorage
{
public:
EmbeddedRocksDBBlockOutputStream(
EmbeddedRocksDBSink(
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_);
Block getHeader() const override;
void write(const Block & block) override;
void consume(Chunk chunk) override;
String getName() const override { return "EmbeddedRocksDBSink"; }
private:
StorageEmbeddedRocksDB & storage;

View File

@ -1,5 +1,5 @@
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/RocksDB/EmbeddedRocksDBBlockOutputStream.h>
#include <Storages/RocksDB/EmbeddedRocksDBSink.h>
#include <Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
@ -337,10 +337,10 @@ Pipe StorageEmbeddedRocksDB::read(
}
}
BlockOutputStreamPtr StorageEmbeddedRocksDB::write(
SinkToStoragePtr StorageEmbeddedRocksDB::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
return std::make_shared<EmbeddedRocksDBBlockOutputStream>(*this, metadata_snapshot);
return std::make_shared<EmbeddedRocksDBSink>(*this, metadata_snapshot);
}

View File

@ -20,7 +20,7 @@ class StorageEmbeddedRocksDB final : public shared_ptr_helper<StorageEmbeddedRoc
{
friend struct shared_ptr_helper<StorageEmbeddedRocksDB>;
friend class EmbeddedRocksDBSource;
friend class EmbeddedRocksDBBlockOutputStream;
friend class EmbeddedRocksDBSink;
friend class EmbeddedRocksDBBlockInputStream;
public:
std::string getName() const override { return "EmbeddedRocksDB"; }
@ -34,7 +34,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
bool supportsParallelInsert() const override { return true; }

View File

@ -25,6 +25,7 @@
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/UnionStep.h>
@ -513,30 +514,30 @@ static void appendBlock(const Block & from, Block & to)
}
class BufferBlockOutputStream : public IBlockOutputStream
class BufferSink : public SinkToStorage
{
public:
explicit BufferBlockOutputStream(
explicit BufferSink(
StorageBuffer & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
if (!block)
return;
// Check table structure.
metadata_snapshot->check(block, true);
metadata_snapshot->check(getPort().getHeader(), true);
}
size_t rows = block.rows();
String getName() const override { return "BufferSink"; }
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
if (!rows)
return;
auto block = getPort().getHeader().cloneWithColumns(chunk.getColumns());
StoragePtr destination;
if (storage.destination_id)
{
@ -642,9 +643,9 @@ private:
};
BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageBuffer::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
return std::make_shared<BufferBlockOutputStream>(*this, metadata_snapshot);
return std::make_shared<BufferSink>(*this, metadata_snapshot);
}

View File

@ -46,7 +46,7 @@ class StorageBuffer final : public shared_ptr_helper<StorageBuffer>, public ISto
{
friend struct shared_ptr_helper<StorageBuffer>;
friend class BufferSource;
friend class BufferBlockOutputStream;
friend class BufferSink;
public:
struct Thresholds
@ -84,7 +84,7 @@ public:
bool supportsSubcolumns() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void startup() override;
/// Flush all buffers into the subordinate table and stop background thread.

View File

@ -10,7 +10,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DistributedSink.h>
#include <Storages/StorageFactory.h>
#include <Storages/AlterCommands.h>
@ -59,7 +59,7 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/EmptySink.h>
#include <Core/Field.h>
#include <Core/Settings.h>
@ -631,7 +631,7 @@ void StorageDistributed::read(
}
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
auto cluster = getCluster();
const auto & settings = local_context->getSettingsRef();
@ -669,7 +669,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
sample_block = metadata_snapshot->getSampleBlock();
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
return std::make_shared<DistributedSink>(
local_context, *this, metadata_snapshot,
createInsertToRemoteTableQuery(remote_database, remote_table, sample_block),
cluster, insert_sync, timeout, StorageID{remote_database, remote_table});

View File

@ -39,7 +39,7 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class StorageDistributed final : public shared_ptr_helper<StorageDistributed>, public IStorage, WithContext
{
friend struct shared_ptr_helper<StorageDistributed>;
friend class DistributedBlockOutputStream;
friend class DistributedSink;
friend class StorageDistributedDirectoryMonitor;
friend class StorageSystemDistributionQueue;
@ -81,7 +81,7 @@ public:
bool supportsParallelInsert() const override { return true; }
std::optional<UInt64> totalBytes(const Settings &) const override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
QueryPipelinePtr distributedWrite(const ASTInsertQuery & query, ContextPtr context) override;

View File

@ -17,6 +17,7 @@
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Common/escapeForFileName.h>
@ -525,10 +526,10 @@ Pipe StorageFile::read(
}
class StorageFileBlockOutputStream : public IBlockOutputStream
class StorageFileSink final : public SinkToStorage
{
public:
explicit StorageFileBlockOutputStream(
explicit StorageFileSink(
StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_,
@ -536,7 +537,8 @@ public:
ContextPtr context,
const std::optional<FormatSettings> & format_settings,
int & flags)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_))
{
@ -567,29 +569,29 @@ public:
{}, format_settings);
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
String getName() const override { return "StorageFileSink"; }
void write(const Block & block) override
{
writer->write(block);
}
void writePrefix() override
void onStart() override
{
if (!prefix_written)
writer->writePrefix();
prefix_written = true;
}
void writeSuffix() override
void consume(Chunk chunk) override
{
writer->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void onFinish() override
{
writer->writeSuffix();
}
void flush() override
{
writer->flush();
}
// void flush() override
// {
// writer->flush();
// }
private:
StorageFile & storage;
@ -600,7 +602,7 @@ private:
bool prefix_written{false};
};
BlockOutputStreamPtr StorageFile::write(
SinkToStoragePtr StorageFile::write(
const ASTPtr & /*query*/,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
@ -620,7 +622,7 @@ BlockOutputStreamPtr StorageFile::write(
fs::create_directories(fs::path(path).parent_path());
}
return std::make_shared<StorageFileBlockOutputStream>(
return std::make_shared<StorageFileSink>(
*this,
metadata_snapshot,
std::unique_lock{rwlock, getLockTimeout(context)},

View File

@ -29,7 +29,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr context) override;
@ -68,7 +68,7 @@ public:
protected:
friend class StorageFileSource;
friend class StorageFileBlockOutputStream;
friend class StorageFileSink;
/// From file descriptor
StorageFile(int table_fd_, CommonArguments args);

View File

@ -67,7 +67,7 @@ StorageJoin::StorageJoin(
restore();
}
BlockOutputStreamPtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
std::lock_guard mutate_lock(mutate_mutex);
return StorageSetOrJoinBase::write(query, metadata_snapshot, context);

View File

@ -45,7 +45,7 @@ public:
/// (but not during processing whole query, it's safe for joinGet that doesn't involve `used_flags` from HashJoin)
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
Pipe read(
const Names & column_names,

View File

@ -25,6 +25,7 @@
#include "StorageLogSettings.h"
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <cassert>
#include <chrono>
@ -204,12 +205,13 @@ void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & colu
}
class LogBlockOutputStream final : public IBlockOutputStream
class LogSink final : public SinkToStorage
{
public:
explicit LogBlockOutputStream(
explicit LogSink(
StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_))
, marks_stream(
@ -227,7 +229,9 @@ public:
}
}
~LogBlockOutputStream() override
String getName() const override { return "LogSink"; }
~LogSink() override
{
try
{
@ -244,9 +248,8 @@ public:
}
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
void writeSuffix() override;
void consume(Chunk chunk) override;
void onFinish() override;
private:
StorageLog & storage;
@ -301,8 +304,9 @@ private:
};
void LogBlockOutputStream::write(const Block & block)
void LogSink::consume(Chunk chunk)
{
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
metadata_snapshot->check(block, true);
/// The set of written offset columns so that you do not write shared offsets of columns for nested structures multiple times
@ -321,14 +325,14 @@ void LogBlockOutputStream::write(const Block & block)
}
void LogBlockOutputStream::writeSuffix()
void LogSink::onFinish()
{
if (done)
return;
WrittenStreams written_streams;
ISerialization::SerializeBinaryBulkSettings settings;
for (const auto & column : getHeader())
for (const auto & column : getPort().getHeader())
{
auto it = serialize_states.find(column.name);
if (it != serialize_states.end())
@ -365,7 +369,7 @@ void LogBlockOutputStream::writeSuffix()
}
ISerialization::OutputStreamGetter LogBlockOutputStream::createStreamGetter(const NameAndTypePair & name_and_type,
ISerialization::OutputStreamGetter LogSink::createStreamGetter(const NameAndTypePair & name_and_type,
WrittenStreams & written_streams)
{
return [&] (const ISerialization::SubstreamPath & path) -> WriteBuffer *
@ -383,7 +387,7 @@ ISerialization::OutputStreamGetter LogBlockOutputStream::createStreamGetter(cons
}
void LogBlockOutputStream::writeData(const NameAndTypePair & name_and_type, const IColumn & column,
void LogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & column,
MarksForColumns & out_marks, WrittenStreams & written_streams)
{
ISerialization::SerializeBinaryBulkSettings settings;
@ -442,7 +446,7 @@ void LogBlockOutputStream::writeData(const NameAndTypePair & name_and_type, cons
}
void LogBlockOutputStream::writeMarks(MarksForColumns && marks)
void LogSink::writeMarks(MarksForColumns && marks)
{
if (marks.size() != storage.file_count)
throw Exception("Wrong number of marks generated from block. Makes no sense.", ErrorCodes::LOGICAL_ERROR);
@ -697,7 +701,7 @@ Pipe StorageLog::read(
return Pipe::unitePipes(std::move(pipes));
}
BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
auto lock_timeout = getLockTimeout(context);
loadMarks(lock_timeout);
@ -706,7 +710,7 @@ BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMe
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return std::make_shared<LogBlockOutputStream>(*this, metadata_snapshot, std::move(lock));
return std::make_shared<LogSink>(*this, metadata_snapshot, std::move(lock));
}
CheckResults StorageLog::checkData(const ASTPtr & /* query */, ContextPtr context)

View File

@ -19,7 +19,7 @@ namespace DB
class StorageLog final : public shared_ptr_helper<StorageLog>, public IStorage
{
friend class LogSource;
friend class LogBlockOutputStream;
friend class LogSink;
friend struct shared_ptr_helper<StorageLog>;
public:
@ -34,7 +34,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;

View File

@ -30,7 +30,7 @@ public:
const Names & column_names, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info,
ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr) override { throwNotAllowed(); }
SinkToStoragePtr write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr) override { throwNotAllowed(); }
NamesAndTypesList getVirtuals() const override;
ColumnSizeByName getColumnSizes() const override;

View File

@ -26,6 +26,7 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
{
@ -214,16 +215,16 @@ void StorageMaterializedView::read(
}
}
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr local_context)
SinkToStoragePtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr local_context)
{
auto storage = getTargetTable();
auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto stream = storage->write(query, metadata_snapshot, local_context);
auto sink = storage->write(query, metadata_snapshot, local_context);
stream->addTableLock(lock);
return stream;
sink->addTableLock(lock);
return sink;
}

View File

@ -34,7 +34,7 @@ public:
return target_table->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void drop() override;
void dropInnerTableIfAny(bool no_delay, ContextPtr local_context) override;

View File

@ -9,6 +9,7 @@
#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
@ -98,21 +99,23 @@ private:
};
class MemoryBlockOutputStream : public IBlockOutputStream
class MemorySink : public SinkToStorage
{
public:
MemoryBlockOutputStream(
MemorySink(
StorageMemory & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
String getName() const override { return "MemorySink"; }
void write(const Block & block) override
void consume(Chunk chunk) override
{
auto block = getPort().getHeader().cloneWithColumns(chunk.getColumns());
metadata_snapshot->check(block, true);
if (storage.compress)
@ -129,7 +132,7 @@ public:
}
}
void writeSuffix() override
void onFinish() override
{
size_t inserted_bytes = 0;
size_t inserted_rows = 0;
@ -226,9 +229,9 @@ Pipe StorageMemory::read(
}
BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageMemory::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
return std::make_shared<MemoryBlockOutputStream>(*this, metadata_snapshot);
return std::make_shared<MemorySink>(*this, metadata_snapshot);
}

View File

@ -22,7 +22,7 @@ namespace DB
*/
class StorageMemory final : public shared_ptr_helper<StorageMemory>, public IStorage
{
friend class MemoryBlockOutputStream;
friend class MemorySink;
friend struct shared_ptr_helper<StorageMemory>;
public:
@ -47,7 +47,7 @@ public:
bool hasEvenlyDistributedRead() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
void drop() override;

View File

@ -18,7 +18,7 @@
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/AlterCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/MergeList.h>
@ -223,11 +223,11 @@ std::optional<UInt64> StorageMergeTree::totalBytes(const Settings &) const
return getTotalActiveSizeInBytes();
}
BlockOutputStreamPtr
SinkToStoragePtr
StorageMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
const auto & settings = local_context->getSettingsRef();
return std::make_shared<MergeTreeBlockOutputStream>(
return std::make_shared<MergeTreeSink>(
*this, metadata_snapshot, settings.max_partitions_per_insert_block, local_context);
}

View File

@ -61,7 +61,7 @@ public:
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo &, ContextPtr) const override;
std::optional<UInt64> totalBytes(const Settings &) const override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
/** Perform the next step in combining the parts.
*/
@ -239,7 +239,7 @@ private:
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
friend class MergeTreeProjectionBlockOutputStream;
friend class MergeTreeBlockOutputStream;
friend class MergeTreeSink;
friend class MergeTreeData;

View File

@ -18,6 +18,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <mysqlxx/Transaction.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Pipe.h>
#include <Common/parseRemoteDescription.h>
@ -108,17 +109,18 @@ Pipe StorageMySQL::read(
}
class StorageMySQLBlockOutputStream : public IBlockOutputStream
class StorageMySQLSink : public SinkToStorage
{
public:
explicit StorageMySQLBlockOutputStream(
explicit StorageMySQLSink(
const StorageMySQL & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const mysqlxx::PoolWithFailover::Entry & entry_,
const size_t & mysql_max_rows_to_insert)
: storage{storage_}
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage{storage_}
, metadata_snapshot{metadata_snapshot_}
, remote_database_name{remote_database_name_}
, remote_table_name{remote_table_name_}
@ -127,10 +129,11 @@ public:
{
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
String getName() const override { return "StorageMySQLSink"; }
void write(const Block & block) override
void consume(Chunk chunk) override
{
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto blocks = splitBlocks(block, max_batch_rows);
mysqlxx::Transaction trans(entry);
try
@ -221,9 +224,9 @@ private:
};
BlockOutputStreamPtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
return std::make_shared<StorageMySQLBlockOutputStream>(
return std::make_shared<StorageMySQLSink>(
*this,
metadata_snapshot,
remote_database_name,

View File

@ -48,10 +48,10 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
private:
friend class StorageMySQLBlockOutputStream;
friend class StorageMySQLSink;
std::string remote_database_name;
std::string remote_table_name;

View File

@ -6,6 +6,7 @@
#include <Storages/IStorage.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Pipe.h>
@ -36,9 +37,9 @@ public:
bool supportsParallelInsert() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr) override
SinkToStoragePtr write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr) override
{
return std::make_shared<NullBlockOutputStream>(metadata_snapshot->getSampleBlock());
return std::make_shared<NullSinkToStorage>(metadata_snapshot->getSampleBlock());
}
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;

View File

@ -27,6 +27,7 @@
#include <Processors/Sources/SourceFromInputStream.h>
#include <Common/parseRemoteDescription.h>
#include <Processors/Pipe.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <IO/WriteHelpers.h>
@ -94,25 +95,27 @@ Pipe StoragePostgreSQL::read(
}
class PostgreSQLBlockOutputStream : public IBlockOutputStream
class PostgreSQLSink : public SinkToStorage
{
public:
explicit PostgreSQLBlockOutputStream(
explicit PostgreSQLSink(
const StorageMetadataPtr & metadata_snapshot_,
postgres::ConnectionHolderPtr connection_holder_,
const String & remote_table_name_,
const String & remote_table_schema_)
: metadata_snapshot(metadata_snapshot_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, metadata_snapshot(metadata_snapshot_)
, connection_holder(std::move(connection_holder_))
, remote_table_name(remote_table_name_)
, remote_table_schema(remote_table_schema_)
{
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
String getName() const override { return "PostgreSQLSink"; }
void write(const Block & block) override
void consume(Chunk chunk) override
{
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (!inserter)
inserter = std::make_unique<StreamTo>(connection_holder->get(),
remote_table_schema.empty() ? pqxx::table_path({remote_table_name})
@ -155,7 +158,7 @@ public:
}
}
void writeSuffix() override
void onFinish() override
{
if (inserter)
inserter->complete();
@ -295,10 +298,10 @@ private:
};
BlockOutputStreamPtr StoragePostgreSQL::write(
SinkToStoragePtr StoragePostgreSQL::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /* context */)
{
return std::make_shared<PostgreSQLBlockOutputStream>(metadata_snapshot, pool->get(), remote_table_name, remote_table_schema);
return std::make_shared<PostgreSQLSink>(metadata_snapshot, pool->get(), remote_table_name, remote_table_schema);
}

View File

@ -41,7 +41,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
private:
friend class PostgreSQLBlockOutputStream;

View File

@ -63,7 +63,7 @@ public:
return getNested()->read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override
{
return getNested()->write(query, metadata_snapshot, context);
}

View File

@ -21,7 +21,7 @@
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
@ -4554,7 +4554,7 @@ void StorageReplicatedMergeTree::assertNotReadonly() const
}
BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
const auto storage_settings_ptr = getSettings();
assertNotReadonly();
@ -4563,7 +4563,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
// TODO: should we also somehow pass list of columns to deduplicate on to the ReplicatedMergeTreeBlockOutputStream ?
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(
return std::make_shared<ReplicatedMergeTreeSink>(
*this, metadata_snapshot, query_settings.insert_quorum,
query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block,
@ -5263,7 +5263,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
/// TODO Allow to use quorum here.
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, query_context,
ReplicatedMergeTreeSink output(*this, metadata_snapshot, 0, 0, 0, false, false, query_context,
/*is_attach*/true);
for (size_t i = 0; i < loaded_parts.size(); ++i)

View File

@ -116,7 +116,7 @@ public:
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
bool optimize(
const ASTPtr & query,
@ -269,7 +269,7 @@ private:
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();
friend class ReplicatedMergeTreeBlockOutputStream;
friend class ReplicatedMergeTreeSink;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeAlterThread;

View File

@ -38,6 +38,7 @@
#include <re2/re2.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Pipe.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -264,10 +265,10 @@ Chunk StorageS3Source::generate()
}
class StorageS3BlockOutputStream : public IBlockOutputStream
class StorageS3Sink : public SinkToStorage
{
public:
StorageS3BlockOutputStream(
StorageS3Sink(
const String & format,
const Block & sample_block_,
ContextPtr context,
@ -277,34 +278,32 @@ public:
const String & key,
size_t min_upload_part_size,
size_t max_single_part_upload_size)
: sample_block(sample_block_)
: SinkToStorage(sample_block_)
, sample_block(sample_block_)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context);
}
Block getHeader() const override
String getName() const override { return "StorageS3Sink"; }
void consume(Chunk chunk) override
{
return sample_block;
if (is_first_chunk)
{
writer->writePrefix();
is_first_chunk = false;
}
writer->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void write(const Block & block) override
{
writer->write(block);
}
// void flush() override
// {
// writer->flush();
// }
void writePrefix() override
{
writer->writePrefix();
}
void flush() override
{
writer->flush();
}
void writeSuffix() override
void onFinish() override
{
try
{
@ -324,6 +323,7 @@ private:
Block sample_block;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
bool is_first_chunk = true;
};
@ -426,10 +426,10 @@ Pipe StorageS3::read(
return pipe;
}
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
updateClientAndAuthSettings(local_context, client_auth);
return std::make_shared<StorageS3BlockOutputStream>(
return std::make_shared<StorageS3Sink>(
format_name,
metadata_snapshot->getSampleBlock(),
local_context,

View File

@ -130,7 +130,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void truncate(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override;

View File

@ -12,6 +12,7 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/StorageFactory.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Common/filesystemHelpers.h>
@ -82,25 +83,27 @@ Pipe StorageSQLite::read(
}
class SQLiteBlockOutputStream : public IBlockOutputStream
class SQLiteSink : public SinkToStorage
{
public:
explicit SQLiteBlockOutputStream(
explicit SQLiteSink(
const StorageSQLite & storage_,
const StorageMetadataPtr & metadata_snapshot_,
StorageSQLite::SQLitePtr sqlite_db_,
const String & remote_table_name_)
: storage{storage_}
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage{storage_}
, metadata_snapshot(metadata_snapshot_)
, sqlite_db(sqlite_db_)
, remote_table_name(remote_table_name_)
{
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
String getName() const override { return "SQLiteSink"; }
void write(const Block & block) override
void consume(Chunk chunk) override
{
auto block = getPort().getHeader().cloneWithColumns(chunk.getColumns());
WriteBufferFromOwnString sqlbuf;
sqlbuf << "INSERT INTO ";
@ -142,11 +145,11 @@ private:
};
BlockOutputStreamPtr StorageSQLite::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr)
SinkToStoragePtr StorageSQLite::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr)
{
if (!sqlite_db)
sqlite_db = openSQLiteDB(database_path, getContext(), /* throw_on_error */true);
return std::make_shared<SQLiteBlockOutputStream>(*this, metadata_snapshot, sqlite_db, remote_table_name);
return std::make_shared<SQLiteSink>(*this, metadata_snapshot, sqlite_db, remote_table_name);
}

View File

@ -41,7 +41,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
private:
String remote_table_name;

View File

@ -13,6 +13,7 @@
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
@ -30,17 +31,17 @@ namespace ErrorCodes
}
class SetOrJoinBlockOutputStream : public IBlockOutputStream
class SetOrJoinSink : public SinkToStorage
{
public:
SetOrJoinBlockOutputStream(
SetOrJoinSink(
StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_, const String & backup_tmp_path_,
const String & backup_file_name_, bool persistent_);
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
void writeSuffix() override;
String getName() const override { return "SetOrJoinSink"; }
void consume(Chunk chunk) override;
void onFinish() override;
private:
StorageSetOrJoinBase & table;
@ -55,14 +56,15 @@ private:
};
SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
SetOrJoinSink::SetOrJoinSink(
StorageSetOrJoinBase & table_,
const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_,
const String & backup_tmp_path_,
const String & backup_file_name_,
bool persistent_)
: table(table_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, table(table_)
, metadata_snapshot(metadata_snapshot_)
, backup_path(backup_path_)
, backup_tmp_path(backup_tmp_path_)
@ -74,17 +76,17 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
{
}
void SetOrJoinBlockOutputStream::write(const Block & block)
void SetOrJoinSink::consume(Chunk chunk)
{
/// Sort columns in the block. This is necessary, since Set and Join count on the same column order in different blocks.
Block sorted_block = block.sortColumns();
Block sorted_block = getPort().getHeader().cloneWithColumns(chunk.detachColumns()).sortColumns();
table.insertBlock(sorted_block);
if (persistent)
backup_stream.write(sorted_block);
}
void SetOrJoinBlockOutputStream::writeSuffix()
void SetOrJoinSink::onFinish()
{
table.finishInsert();
if (persistent)
@ -99,10 +101,10 @@ void SetOrJoinBlockOutputStream::writeSuffix()
}
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
UInt64 id = ++increment;
return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin", persistent);
return std::make_shared<SetOrJoinSink>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin", persistent);
}

View File

@ -18,12 +18,12 @@ using SetPtr = std::shared_ptr<Set>;
*/
class StorageSetOrJoinBase : public IStorage
{
friend class SetOrJoinBlockOutputStream;
friend class SetOrJoinSink;
public:
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {path}; }

View File

@ -31,6 +31,7 @@
#include "StorageLogSettings.h"
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Pipe.h>
#include <cassert>
@ -153,12 +154,13 @@ private:
};
class StripeLogBlockOutputStream final : public IBlockOutputStream
class StripeLogSink final : public SinkToStorage
{
public:
explicit StripeLogBlockOutputStream(
explicit StripeLogSink(
StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_))
, data_out_file(storage.table_path + "data.bin")
@ -181,7 +183,9 @@ public:
}
}
~StripeLogBlockOutputStream() override
String getName() const override { return "StripeLogSink"; }
~StripeLogSink() override
{
try
{
@ -202,14 +206,12 @@ public:
}
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
void consume(Chunk chunk) override
{
block_out.write(block);
block_out.write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void writeSuffix() override
void onFinish() override
{
if (done)
return;
@ -368,13 +370,13 @@ Pipe StorageStripeLog::read(
}
BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
std::unique_lock lock(rwlock, getLockTimeout(context));
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return std::make_shared<StripeLogBlockOutputStream>(*this, metadata_snapshot, std::move(lock));
return std::make_shared<StripeLogSink>(*this, metadata_snapshot, std::move(lock));
}

View File

@ -19,7 +19,7 @@ namespace DB
class StorageStripeLog final : public shared_ptr_helper<StorageStripeLog>, public IStorage
{
friend class StripeLogSource;
friend class StripeLogBlockOutputStream;
friend class StripeLogSink;
friend struct shared_ptr_helper<StorageStripeLog>;
public:
@ -34,7 +34,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;

View File

@ -113,7 +113,7 @@ public:
return pipe;
}
BlockOutputStreamPtr write(
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context) override

View File

@ -23,8 +23,6 @@
#include <DataTypes/NestedUtils.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Columns/ColumnArray.h>
#include <Interpreters/Context.h>
@ -37,6 +35,7 @@
#include "StorageLogSettings.h"
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Pipe.h>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
@ -192,14 +191,15 @@ void TinyLogSource::readData(const NameAndTypePair & name_and_type,
}
class TinyLogBlockOutputStream final : public IBlockOutputStream
class TinyLogSink final : public SinkToStorage
{
public:
explicit TinyLogBlockOutputStream(
explicit TinyLogSink(
StorageTinyLog & storage_,
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_), metadata_snapshot(metadata_snapshot_), lock(std::move(lock_))
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_), metadata_snapshot(metadata_snapshot_), lock(std::move(lock_))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
@ -213,7 +213,7 @@ public:
}
}
~TinyLogBlockOutputStream() override
~TinyLogSink() override
{
try
{
@ -231,10 +231,10 @@ public:
}
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
String getName() const override { return "TinyLogSink"; }
void write(const Block & block) override;
void writeSuffix() override;
void consume(Chunk chunk) override;
void onFinish() override;
private:
StorageTinyLog & storage;
@ -274,7 +274,7 @@ private:
};
ISerialization::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(
ISerialization::OutputStreamGetter TinyLogSink::createStreamGetter(
const NameAndTypePair & column,
WrittenStreams & written_streams)
{
@ -298,7 +298,7 @@ ISerialization::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(
}
void TinyLogBlockOutputStream::writeData(const NameAndTypePair & name_and_type, const IColumn & column, WrittenStreams & written_streams)
void TinyLogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & column, WrittenStreams & written_streams)
{
ISerialization::SerializeBinaryBulkSettings settings;
const auto & [name, type] = name_and_type;
@ -318,7 +318,7 @@ void TinyLogBlockOutputStream::writeData(const NameAndTypePair & name_and_type,
}
void TinyLogBlockOutputStream::writeSuffix()
void TinyLogSink::onFinish()
{
if (done)
return;
@ -332,7 +332,7 @@ void TinyLogBlockOutputStream::writeSuffix()
WrittenStreams written_streams;
ISerialization::SerializeBinaryBulkSettings settings;
for (const auto & column : getHeader())
for (const auto & column : getPort().getHeader())
{
auto it = serialize_states.find(column.name);
if (it != serialize_states.end())
@ -365,8 +365,9 @@ void TinyLogBlockOutputStream::writeSuffix()
}
void TinyLogBlockOutputStream::write(const Block & block)
void TinyLogSink::consume(Chunk chunk)
{
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
metadata_snapshot->check(block, true);
/// The set of written offset columns so that you do not write shared columns for nested structures multiple times
@ -508,9 +509,9 @@ Pipe StorageTinyLog::read(
}
BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
return std::make_shared<TinyLogBlockOutputStream>(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)});
return std::make_shared<TinyLogSink>(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)});
}

View File

@ -18,7 +18,7 @@ namespace DB
class StorageTinyLog final : public shared_ptr_helper<StorageTinyLog>, public IStorage
{
friend class TinyLogSource;
friend class TinyLogBlockOutputStream;
friend class TinyLogSink;
friend struct shared_ptr_helper<StorageTinyLog>;
public:
@ -33,7 +33,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;

View File

@ -144,14 +144,15 @@ namespace
};
}
StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block_,
ContextPtr context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: sample_block(sample_block_)
StorageURLSink::StorageURLSink(
const Poco::URI & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: SinkToStorage(sample_block)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
@ -161,17 +162,18 @@ StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
}
void StorageURLBlockOutputStream::write(const Block & block)
void StorageURLSink::consume(Chunk chunk)
{
writer->write(block);
if (is_first_chunk)
{
writer->writePrefix();
is_first_chunk = false;
}
writer->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void StorageURLBlockOutputStream::writePrefix()
{
writer->writePrefix();
}
void StorageURLBlockOutputStream::writeSuffix()
void StorageURLSink::onFinish()
{
writer->writeSuffix();
writer->flush();
@ -293,9 +295,9 @@ Pipe StorageURLWithFailover::read(
}
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
return std::make_shared<StorageURLBlockOutputStream>(uri, format_name,
return std::make_shared<StorageURLSink>(uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri.toString(), compression_method));

View File

@ -3,7 +3,7 @@
#include <Storages/IStorage.h>
#include <Poco/URI.h>
#include <common/shared_ptr_helper.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <Storages/StorageFactory.h>
@ -32,7 +32,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
protected:
IStorageURLBase(
@ -77,31 +77,27 @@ private:
virtual Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const = 0;
};
class StorageURLBlockOutputStream : public IBlockOutputStream
class StorageURLSink : public SinkToStorage
{
public:
StorageURLBlockOutputStream(
StorageURLSink(
const Poco::URI & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block_,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method);
Block getHeader() const override
{
return sample_block;
}
void write(const Block & block) override;
void writePrefix() override;
void writeSuffix() override;
std::string getName() const override { return "StorageURLSink"; }
void consume(Chunk chunk) override;
void onFinish() override;
private:
Block sample_block;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
bool is_first_chunk = true;
};
class StorageURL : public shared_ptr_helper<StorageURL>, public IStorageURLBase

View File

@ -114,7 +114,7 @@ Pipe StorageXDBC::read(
return IStorageURLBase::read(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
}
BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
bridge_helper->startBridgeSync();
@ -130,7 +130,7 @@ BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageM
request_uri.addQueryParameter("format_name", format_name);
request_uri.addQueryParameter("sample_block", metadata_snapshot->getSampleBlock().getNamesAndTypesList().toString());
return std::make_shared<StorageURLBlockOutputStream>(
return std::make_shared<StorageURLSink>(
request_uri,
format_name,
getFormatSettings(local_context),

View File

@ -33,7 +33,7 @@ public:
ContextPtr context_,
BridgeHelperPtr bridge_helper_);
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
std::string getName() const override;
private:

View File

@ -1,7 +1,7 @@
#include <gtest/gtest.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/tests/gtest_disk.h>
@ -100,7 +100,7 @@ std::string writeData(int rows, DB::StoragePtr & table, const DB::ContextPtr con
block.insert(column);
}
BlockOutputStreamPtr out = table->write({}, metadata_snapshot, context);
auto out = std::make_shared<PushingToSinkBlockOutputStream>(table->write({}, metadata_snapshot, context));
out->write(block);
out->writeSuffix();

View File

@ -16,8 +16,8 @@ SRCS(
ColumnsDescription.cpp
ConstraintsDescription.cpp
Distributed/DirectoryMonitor.cpp
Distributed/DistributedBlockOutputStream.cpp
Distributed/DistributedSettings.cpp
Distributed/DistributedSink.cpp
IStorage.cpp
IndicesDescription.cpp
JoinSettings.cpp
@ -41,7 +41,6 @@ SRCS(
MergeTree/MergeAlgorithm.cpp
MergeTree/MergeList.cpp
MergeTree/MergeTreeBaseSelectProcessor.cpp
MergeTree/MergeTreeBlockOutputStream.cpp
MergeTree/MergeTreeBlockReadUtils.cpp
MergeTree/MergeTreeData.cpp
MergeTree/MergeTreeDataMergerMutator.cpp
@ -87,7 +86,8 @@ SRCS(
MergeTree/MergeTreeSelectProcessor.cpp
MergeTree/MergeTreeSequentialSource.cpp
MergeTree/MergeTreeSettings.cpp
MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp
MergeTree/MergeTreeSink.cpp
MergeTree/MergeTreeThreadSelectProcessor.cpp
MergeTree/MergeTreeWhereOptimizer.cpp
MergeTree/MergeTreeWriteAheadLog.cpp
MergeTree/MergeType.cpp
@ -99,7 +99,6 @@ SRCS(
MergeTree/ReplicatedFetchList.cpp
MergeTree/ReplicatedMergeTreeAddress.cpp
MergeTree/ReplicatedMergeTreeAltersSequence.cpp
MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
MergeTree/ReplicatedMergeTreeCleanupThread.cpp
MergeTree/ReplicatedMergeTreeLogEntry.cpp
MergeTree/ReplicatedMergeTreeMergeStrategyPicker.cpp
@ -108,6 +107,7 @@ SRCS(
MergeTree/ReplicatedMergeTreePartHeader.cpp
MergeTree/ReplicatedMergeTreeQueue.cpp
MergeTree/ReplicatedMergeTreeRestartingThread.cpp
MergeTree/ReplicatedMergeTreeSink.cpp
MergeTree/ReplicatedMergeTreeTableMetadata.cpp
MergeTree/SimpleMergeSelector.cpp
MergeTree/TTLMergeSelector.cpp