Update Storage::write

This commit is contained in:
Nikolai Kochetov 2021-07-23 17:25:35 +03:00
parent 0f78a14630
commit 2dc5c89b66
44 changed files with 298 additions and 268 deletions

View File

@ -14,7 +14,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/EmptySink.h>
#include <filesystem>

View File

@ -16,7 +16,9 @@ ISink::Status ISink::prepare()
if (input.isFinished())
{
onFinish();
if (!was_on_finish_called)
return Status::Ready;
return Status::Finished;
}
@ -31,9 +33,16 @@ ISink::Status ISink::prepare()
void ISink::work()
{
consume(std::move(current_chunk));
has_input = false;
if (has_input)
{
consume(std::move(current_chunk));
has_input = false;
}
else
{
onFinish();
was_on_finish_called = true;
}
}
}

View File

@ -12,6 +12,7 @@ protected:
InputPort & input;
Chunk current_chunk;
bool has_input = false;
bool was_on_finish_called = false;
virtual void consume(Chunk block) = 0;

View File

@ -4,7 +4,8 @@
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/NullSource.h>

View File

@ -0,0 +1,18 @@
#pragma once
#include <Processors/ISink.h>
namespace DB
{
/// Sink which reads everything and do nothing with it.
class EmptySink : public ISink
{
public:
explicit EmptySink(Block header) : ISink(std::move(header)) {}
String getName() const override { return "EmptySink"; }
protected:
void consume(Chunk) override {}
};
}

View File

@ -1,5 +1,4 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/ISink.h>
namespace DB
@ -21,15 +20,4 @@ protected:
void consume(Chunk) override {}
};
/// Sink which reads everything and do nothing with it.
class EmptySink : public ISink
{
public:
explicit EmptySink(Block header) : ISink(std::move(header)) {}
String getName() const override { return "EmptySink"; }
protected:
void consume(Chunk) override {}
};
}

View File

@ -0,0 +1,22 @@
#pragma once
#include <Processors/ISink.h>
#include <Storages/TableLockHolder.h>
namespace DB
{
/// Sink which reads everything and do nothing with it.
class SinkToStorage : public ISink
{
public:
using ISink::ISink;
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
private:
std::vector<TableLockHolder> table_locks;
};
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
}

View File

@ -1,6 +1,6 @@
#include <Processors/Sources/DelayedSource.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/ResizeProcessor.h>
namespace DB

View File

@ -1,7 +1,7 @@
#include <gtest/gtest.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Columns/ColumnsNumber.h>

View File

@ -86,7 +86,7 @@ static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & bl
}
DistributedBlockOutputStream::DistributedBlockOutputStream(
DistributedSink::DistributedSink(
ContextPtr context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -95,7 +95,11 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
bool insert_sync_,
UInt64 insert_timeout_,
StorageID main_table_)
: context(Context::createCopy(context_))
: SinkToStorage(
context_->getSettingsRef().insert_allow_materialized_columns
? metadata_snapshot_->getSampleBlock()
: metadata_snapshot_->getSampleBlockNonMaterialized())
, context(Context::createCopy(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, query_ast(query_ast_)
@ -115,24 +119,15 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
}
Block DistributedBlockOutputStream::getHeader() const
void DistributedSink::consume(Chunk chunk)
{
if (!allow_materialized)
return metadata_snapshot->getSampleBlockNonMaterialized();
else
return metadata_snapshot->getSampleBlock();
}
if (is_first_chunk)
{
storage.delayInsertOrThrowIfNeeded();
is_first_chunk = false;
}
void DistributedBlockOutputStream::writePrefix()
{
storage.delayInsertOrThrowIfNeeded();
}
void DistributedBlockOutputStream::write(const Block & block)
{
Block ordinary_block{ block };
auto ordinary_block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (!allow_materialized)
{
@ -155,7 +150,7 @@ void DistributedBlockOutputStream::write(const Block & block)
writeAsync(ordinary_block);
}
void DistributedBlockOutputStream::writeAsync(const Block & block)
void DistributedSink::writeAsync(const Block & block)
{
if (random_shard_insert)
{
@ -174,7 +169,7 @@ void DistributedBlockOutputStream::writeAsync(const Block & block)
}
std::string DistributedBlockOutputStream::getCurrentStateDescription()
std::string DistributedSink::getCurrentStateDescription()
{
WriteBufferFromOwnString buffer;
const auto & addresses = cluster->getShardsAddresses();
@ -203,7 +198,7 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
}
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end)
void DistributedSink::initWritingJobs(const Block & first_block, size_t start, size_t end)
{
const Settings & settings = context->getSettingsRef();
const auto & addresses_with_failovers = cluster->getShardsAddresses();
@ -249,7 +244,7 @@ void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, si
}
void DistributedBlockOutputStream::waitForJobs()
void DistributedSink::waitForJobs()
{
pool->wait();
@ -279,7 +274,7 @@ void DistributedBlockOutputStream::waitForJobs()
ThreadPool::Job
DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards)
DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, size_t num_shards)
{
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
@ -403,7 +398,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
}
void DistributedBlockOutputStream::writeSync(const Block & block)
void DistributedSink::writeSync(const Block & block)
{
const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
@ -487,7 +482,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
}
void DistributedBlockOutputStream::writeSuffix()
void DistributedSink::onFinish()
{
auto log_performance = [this]()
{
@ -537,7 +532,7 @@ void DistributedBlockOutputStream::writeSuffix()
}
IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & source_block) const
IColumn::Selector DistributedSink::createSelector(const Block & source_block) const
{
Block current_block_with_sharding_key_expr = source_block;
storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr);
@ -548,7 +543,7 @@ IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & sou
}
Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
Blocks DistributedSink::splitBlock(const Block & block)
{
auto selector = createSelector(block);
@ -572,7 +567,7 @@ Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
}
void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
void DistributedSink::writeSplitAsync(const Block & block)
{
Blocks splitted_blocks = splitBlock(block);
const size_t num_shards = splitted_blocks.size();
@ -585,7 +580,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
}
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t shard_id)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
@ -621,7 +616,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t sh
}
void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repeats)
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
{
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
@ -633,7 +628,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repe
}
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();

View File

@ -1,7 +1,7 @@
#pragma once
#include <Parsers/formatAST.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Core/Block.h>
#include <Common/PODArray.h>
@ -34,10 +34,10 @@ class StorageDistributed;
* and the resulting blocks are written in a compressed Native format in separate directories for sending.
* For each destination address (each directory with data to send), a separate thread is created in StorageDistributed,
* which monitors the directory and sends data. */
class DistributedBlockOutputStream : public IBlockOutputStream
class DistributedSink : public SinkToStorage
{
public:
DistributedBlockOutputStream(
DistributedSink(
ContextPtr context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -47,11 +47,8 @@ public:
UInt64 insert_timeout_,
StorageID main_table_);
Block getHeader() const override;
void write(const Block & block) override;
void writePrefix() override;
void writeSuffix() override;
void consume(Chunk chunk) override;
void onFinish() override;
private:
IColumn::Selector createSelector(const Block & source_block) const;
@ -77,7 +74,7 @@ private:
void initWritingJobs(const Block & first_block, size_t start, size_t end);
struct JobReplica;
ThreadPool::Job runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards);
ThreadPool::Job runWritingJob(JobReplica & job, const Block & current_block, size_t num_shards);
void waitForJobs();
@ -97,6 +94,8 @@ private:
bool random_shard_insert;
bool allow_materialized;
bool is_first_chunk = true;
/// Sync-related stuff
UInt64 insert_timeout; // in seconds
StorageID main_table;

View File

@ -51,6 +51,9 @@ class Pipe;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
class SinkToStorage;
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
class QueryPipeline;
using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
@ -326,7 +329,7 @@ public:
* changed during lifetime of the returned streams, but the snapshot is
* guaranteed to be immutable.
*/
virtual BlockOutputStreamPtr write(
virtual SinkToStoragePtr write(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*context*/)

View File

@ -6,30 +6,26 @@
namespace DB
{
KafkaBlockOutputStream::KafkaBlockOutputStream(
KafkaSink::KafkaSink(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const ContextPtr & context_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlockNonMaterialized())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
{
}
Block KafkaBlockOutputStream::getHeader() const
void KafkaSink::onStart()
{
return metadata_snapshot->getSampleBlockNonMaterialized();
}
void KafkaBlockOutputStream::writePrefix()
{
buffer = storage.createWriteBuffer(getHeader());
buffer = storage.createWriteBuffer(getPort().getHeader());
auto format_settings = getFormatSettings(context);
format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
getHeader(), context,
getPort().getHeader(), context,
[this](const Columns & columns, size_t row)
{
buffer->countRow(columns, row);
@ -37,20 +33,22 @@ void KafkaBlockOutputStream::writePrefix()
format_settings);
}
void KafkaBlockOutputStream::write(const Block & block)
void KafkaSink::consume(Chunk chunk)
{
child->write(block);
if (is_first_chunk)
{
onStart();
is_first_chunk = false;
}
child->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void KafkaBlockOutputStream::writeSuffix()
void KafkaSink::onFinish()
{
if (child)
child->writeSuffix();
flush();
}
//flush();
void KafkaBlockOutputStream::flush()
{
if (buffer)
buffer->flush();
}

View File

@ -1,26 +1,25 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/Kafka/StorageKafka.h>
namespace DB
{
class KafkaBlockOutputStream : public IBlockOutputStream
class KafkaSink : public SinkToStorage
{
public:
explicit KafkaBlockOutputStream(
explicit KafkaSink(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<const Context> & context_);
Block getHeader() const override;
void consume(Chunk chunk) override;
void onStart();
void onFinish() override;
String getName() const override { return "KafkaSink"; }
void writePrefix() override;
void write(const Block & block) override;
void writeSuffix() override;
void flush() override;
///void flush() override;
private:
StorageKafka & storage;
@ -28,6 +27,8 @@ private:
const std::shared_ptr<const Context> context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
bool is_first_chunk = true;
};
}

View File

@ -289,14 +289,14 @@ Pipe StorageKafka::read(
}
BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
auto modified_context = Context::createCopy(local_context);
modified_context->applySettingsChanges(settings_adjustments);
if (topics.size() > 1)
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<KafkaBlockOutputStream>(*this, metadata_snapshot, modified_context);
return std::make_shared<KafkaSink>(*this, metadata_snapshot, modified_context);
}

View File

@ -50,7 +50,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr context) override;

View File

@ -8,24 +8,19 @@
namespace DB
{
RabbitMQBlockOutputStream::RabbitMQBlockOutputStream(
RabbitMQSink::RabbitMQSink(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
ContextPtr context_)
: storage(storage_)
: SinkToStorage(metadata_snapshot->getSampleBlockNonMaterialized())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
{
}
Block RabbitMQBlockOutputStream::getHeader() const
{
return metadata_snapshot->getSampleBlockNonMaterialized();
}
void RabbitMQBlockOutputStream::writePrefix()
void RabbitMQSink::onStart()
{
if (!storage.exchangeRemoved())
storage.unbindExchange();
@ -37,7 +32,7 @@ void RabbitMQBlockOutputStream::writePrefix()
format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
getHeader(), context,
getPort().getHeader(), context,
[this](const Columns & /* columns */, size_t /* rows */)
{
buffer->countRow();
@ -46,13 +41,19 @@ void RabbitMQBlockOutputStream::writePrefix()
}
void RabbitMQBlockOutputStream::write(const Block & block)
void RabbitMQSink::consume(Chunk chunk)
{
child->write(block);
if (is_first_chunk)
{
onStart();
is_first_chunk = false;
}
child->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void RabbitMQBlockOutputStream::writeSuffix()
void RabbitMQSink::onFinish()
{
child->writeSuffix();

View File

@ -1,23 +1,23 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
namespace DB
{
class RabbitMQBlockOutputStream : public IBlockOutputStream
class RabbitMQSink : public SinkToStorage
{
public:
explicit RabbitMQBlockOutputStream(StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, ContextPtr context_);
explicit RabbitMQSink(StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, ContextPtr context_);
Block getHeader() const override;
void onStart();
void consume(Chunk chunk) override;
void onFinish() override;
void writePrefix() override;
void write(const Block & block) override;
void writeSuffix() override;
String getName() const override { return "RabbitMQSink"; }
private:
StorageRabbitMQ & storage;
@ -25,5 +25,7 @@ private:
ContextPtr context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
bool is_first_chunk = true;
};
}

View File

@ -644,9 +644,9 @@ Pipe StorageRabbitMQ::read(
}
BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
return std::make_shared<RabbitMQBlockOutputStream>(*this, metadata_snapshot, local_context);
return std::make_shared<RabbitMQSink>(*this, metadata_snapshot, local_context);
}

View File

@ -50,7 +50,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context) override;

View File

@ -13,14 +13,14 @@ namespace ErrorCodes
extern const int ROCKSDB_ERROR;
}
EmbeddedRocksDBBlockOutputStream::EmbeddedRocksDBBlockOutputStream(
EmbeddedRocksDBSink::EmbeddedRocksDBSink(
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
: SinkToStorage(metadata_snapshot->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{
Block sample_block = metadata_snapshot->getSampleBlock();
for (const auto & elem : sample_block)
for (const auto & elem : getPort().getHeader())
{
if (elem.name == storage.primary_key)
break;
@ -28,15 +28,10 @@ EmbeddedRocksDBBlockOutputStream::EmbeddedRocksDBBlockOutputStream(
}
}
Block EmbeddedRocksDBBlockOutputStream::getHeader() const
void EmbeddedRocksDBSink::consume(Chunk chunk)
{
return metadata_snapshot->getSampleBlock();
}
void EmbeddedRocksDBBlockOutputStream::write(const Block & block)
{
metadata_snapshot->check(block, true);
auto rows = block.rows();
auto rows = chunk.getNumRows();
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
WriteBufferFromOwnString wb_key;
WriteBufferFromOwnString wb_value;

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
@ -10,15 +10,15 @@ class StorageEmbeddedRocksDB;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
class EmbeddedRocksDBBlockOutputStream : public IBlockOutputStream
class EmbeddedRocksDBSink : public SinkToStorage
{
public:
EmbeddedRocksDBBlockOutputStream(
EmbeddedRocksDBSink(
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_);
Block getHeader() const override;
void write(const Block & block) override;
void consume(Chunk chunk) override;
String getName() const override { return "EmbeddedRocksDBSink"; }
private:
StorageEmbeddedRocksDB & storage;

View File

@ -333,10 +333,10 @@ Pipe StorageEmbeddedRocksDB::read(
}
}
BlockOutputStreamPtr StorageEmbeddedRocksDB::write(
SinkToStoragePtr StorageEmbeddedRocksDB::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
return std::make_shared<EmbeddedRocksDBBlockOutputStream>(*this, metadata_snapshot);
return std::make_shared<EmbeddedRocksDBSink>(*this, metadata_snapshot);
}

View File

@ -20,7 +20,7 @@ class StorageEmbeddedRocksDB final : public shared_ptr_helper<StorageEmbeddedRoc
{
friend struct shared_ptr_helper<StorageEmbeddedRocksDB>;
friend class EmbeddedRocksDBSource;
friend class EmbeddedRocksDBBlockOutputStream;
friend class EmbeddedRocksDBSink;
friend class EmbeddedRocksDBBlockInputStream;
public:
std::string getName() const override { return "EmbeddedRocksDB"; }
@ -34,7 +34,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
bool supportsParallelInsert() const override { return true; }

View File

@ -25,6 +25,7 @@
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/UnionStep.h>
@ -513,30 +514,28 @@ static void appendBlock(const Block & from, Block & to)
}
class BufferBlockOutputStream : public IBlockOutputStream
class BufferSink : public SinkToStorage
{
public:
explicit BufferBlockOutputStream(
explicit BufferSink(
StorageBuffer & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
if (!block)
return;
// Check table structure.
metadata_snapshot->check(block, true);
metadata_snapshot->check(getPort().getHeader(), true);
}
size_t rows = block.rows();
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
if (!rows)
return;
auto block = getPort().getHeader().cloneWithColumns(chunk.getColumns());
StoragePtr destination;
if (storage.destination_id)
{
@ -642,9 +641,9 @@ private:
};
BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageBuffer::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
return std::make_shared<BufferBlockOutputStream>(*this, metadata_snapshot);
return std::make_shared<BufferSource>(*this, metadata_snapshot);
}

View File

@ -46,7 +46,7 @@ class StorageBuffer final : public shared_ptr_helper<StorageBuffer>, public ISto
{
friend struct shared_ptr_helper<StorageBuffer>;
friend class BufferSource;
friend class BufferBlockOutputStream;
friend class BufferSink;
public:
struct Thresholds
@ -84,7 +84,7 @@ public:
bool supportsSubcolumns() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void startup() override;
/// Flush all buffers into the subordinate table and stop background thread.

View File

@ -59,7 +59,7 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/EmptySink.h>
#include <Core/Field.h>
#include <Core/Settings.h>
@ -631,7 +631,7 @@ void StorageDistributed::read(
}
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
auto cluster = getCluster();
const auto & settings = local_context->getSettingsRef();
@ -669,7 +669,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
sample_block = metadata_snapshot->getSampleBlock();
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
return std::make_shared<DistributedSink>(
local_context, *this, metadata_snapshot,
createInsertToRemoteTableQuery(remote_database, remote_table, sample_block),
cluster, insert_sync, timeout, StorageID{remote_database, remote_table});

View File

@ -39,7 +39,7 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class StorageDistributed final : public shared_ptr_helper<StorageDistributed>, public IStorage, WithContext
{
friend struct shared_ptr_helper<StorageDistributed>;
friend class DistributedBlockOutputStream;
friend class DistributedSink;
friend class StorageDistributedDirectoryMonitor;
friend class StorageSystemDistributionQueue;
@ -81,7 +81,7 @@ public:
bool supportsParallelInsert() const override { return true; }
std::optional<UInt64> totalBytes(const Settings &) const override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
QueryPipelinePtr distributedWrite(const ASTInsertQuery & query, ContextPtr context) override;

View File

@ -15,7 +15,7 @@
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Common/escapeForFileName.h>
@ -516,10 +516,10 @@ Pipe StorageFile::read(
}
class StorageFileBlockOutputStream : public IBlockOutputStream
class StorageFileSink : public SinkToStorage
{
public:
explicit StorageFileBlockOutputStream(
explicit StorageFileSink(
StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_,
@ -527,7 +527,8 @@ public:
ContextPtr context,
const std::optional<FormatSettings> & format_settings,
int & flags)
: storage(storage_)
: SinkToStorage(metadata_snapshot->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_))
{
@ -563,29 +564,24 @@ public:
{}, format_settings);
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
writer->write(block);
}
void writePrefix() override
void consume(Chunk chunk) override
{
if (!prefix_written)
writer->writePrefix();
prefix_written = true;
writer->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void writeSuffix() override
void onFinish() override
{
writer->writeSuffix();
}
void flush() override
{
writer->flush();
}
// void flush() override
// {
// writer->flush();
// }
private:
StorageFile & storage;
@ -596,7 +592,7 @@ private:
bool prefix_written{false};
};
BlockOutputStreamPtr StorageFile::write(
SinkToStoragePtr StorageFile::write(
const ASTPtr & /*query*/,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
@ -616,7 +612,7 @@ BlockOutputStreamPtr StorageFile::write(
fs::create_directories(fs::path(path).parent_path());
}
return std::make_shared<StorageFileBlockOutputStream>(
return std::make_shared<StorageFileSink>(
*this,
metadata_snapshot,
std::unique_lock{rwlock, getLockTimeout(context)},

View File

@ -29,7 +29,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr context) override;
@ -68,7 +68,7 @@ public:
protected:
friend class StorageFileSource;
friend class StorageFileBlockOutputStream;
friend class StorageFileSink;
/// From file descriptor
StorageFile(int table_fd_, CommonArguments args);

View File

@ -45,7 +45,7 @@ public:
/// (but not during processing whole query, it's safe for joinGet that doesn't involve `used_flags` from HashJoin)
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
Pipe read(
const Names & column_names,

View File

@ -12,6 +12,7 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/StorageFactory.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Common/filesystemHelpers.h>
@ -77,25 +78,25 @@ Pipe StorageSQLite::read(
}
class SQLiteBlockOutputStream : public IBlockOutputStream
class SQLiteSink : public SinkToStorage
{
public:
explicit SQLiteBlockOutputStream(
explicit SQLiteSink(
const StorageSQLite & storage_,
const StorageMetadataPtr & metadata_snapshot_,
StorageSQLite::SQLitePtr sqlite_db_,
const String & remote_table_name_)
: storage{storage_}
: SinkToStorage(metadata_snapshot->getSampleBlock())
, storage{storage_}
, metadata_snapshot(metadata_snapshot_)
, sqlite_db(sqlite_db_)
, remote_table_name(remote_table_name_)
{
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
void consume(Chunk chunk) override
{
auto block = getPort().getHeader().cloneWithColumns(chunk.getColumns());
WriteBufferFromOwnString sqlbuf;
sqlbuf << "INSERT INTO ";
@ -137,9 +138,9 @@ private:
};
BlockOutputStreamPtr StorageSQLite::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr)
SinkToStoragePtr StorageSQLite::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr)
{
return std::make_shared<SQLiteBlockOutputStream>(*this, metadata_snapshot, sqlite_db, remote_table_name);
return std::make_shared<SQLiteSink>(*this, metadata_snapshot, sqlite_db, remote_table_name);
}

View File

@ -40,7 +40,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
private:
String remote_table_name;

View File

@ -13,6 +13,7 @@
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
@ -30,17 +31,16 @@ namespace ErrorCodes
}
class SetOrJoinBlockOutputStream : public IBlockOutputStream
class SetOrJoinSink : public SinkToStorage
{
public:
SetOrJoinBlockOutputStream(
SetOrJoinSink(
StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_, const String & backup_tmp_path_,
const String & backup_file_name_, bool persistent_);
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
void writeSuffix() override;
void consume(Chunk chunk) override;
void onFinish() override;
private:
StorageSetOrJoinBase & table;
@ -55,14 +55,15 @@ private:
};
SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
SetOrJoinSink::SetOrJoinSink(
StorageSetOrJoinBase & table_,
const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_,
const String & backup_tmp_path_,
const String & backup_file_name_,
bool persistent_)
: table(table_)
: SinkToStorage(metadata_snapshot->getSampleBlock())
, table(table_)
, metadata_snapshot(metadata_snapshot_)
, backup_path(backup_path_)
, backup_tmp_path(backup_tmp_path_)
@ -74,17 +75,17 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
{
}
void SetOrJoinBlockOutputStream::write(const Block & block)
void SetOrJoinSink::consume(Chunk chunk)
{
/// Sort columns in the block. This is necessary, since Set and Join count on the same column order in different blocks.
Block sorted_block = block.sortColumns();
Block sorted_block = getPort().getHeader().cloneWithColumns(chunk.detachColumns()).sortColumns();
table.insertBlock(sorted_block);
if (persistent)
backup_stream.write(sorted_block);
}
void SetOrJoinBlockOutputStream::writeSuffix()
void SetOrJoinSink::onFinish()
{
table.finishInsert();
if (persistent)
@ -99,10 +100,10 @@ void SetOrJoinBlockOutputStream::writeSuffix()
}
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
UInt64 id = ++increment;
return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin", persistent);
return std::make_shared<SetOrJoinSink>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin", persistent);
}

View File

@ -18,12 +18,12 @@ using SetPtr = std::shared_ptr<Set>;
*/
class StorageSetOrJoinBase : public IStorage
{
friend class SetOrJoinBlockOutputStream;
friend class SetOrJoinSink;
public:
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {path}; }

View File

@ -31,6 +31,7 @@
#include "StorageLogSettings.h"
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Pipe.h>
#include <cassert>
@ -153,12 +154,13 @@ private:
};
class StripeLogBlockOutputStream final : public IBlockOutputStream
class StripeLogSink final : public SinkToStorage
{
public:
explicit StripeLogBlockOutputStream(
explicit StripeLogSink(
StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_)
: SinkToStorage(metadata_snapshot->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_))
, data_out_file(storage.table_path + "data.bin")
@ -181,7 +183,9 @@ public:
}
}
~StripeLogBlockOutputStream() override
String getName() const override { return "StripeLogSink"; }
~StripeLogSink() override
{
try
{
@ -202,14 +206,12 @@ public:
}
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
void consume(Chunk chunk) override
{
block_out.write(block);
block_out.write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void writeSuffix() override
void onFinish() override
{
if (done)
return;
@ -368,13 +370,13 @@ Pipe StorageStripeLog::read(
}
BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
std::unique_lock lock(rwlock, getLockTimeout(context));
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return std::make_shared<StripeLogBlockOutputStream>(*this, metadata_snapshot, std::move(lock));
return std::make_shared<StripeLogSink>(*this, metadata_snapshot, std::move(lock));
}

View File

@ -19,7 +19,7 @@ namespace DB
class StorageStripeLog final : public shared_ptr_helper<StorageStripeLog>, public IStorage
{
friend class StripeLogSource;
friend class StripeLogBlockOutputStream;
friend class StripeLogSink;
friend struct shared_ptr_helper<StorageStripeLog>;
public:
@ -34,7 +34,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;

View File

@ -113,7 +113,7 @@ public:
return pipe;
}
BlockOutputStreamPtr write(
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context) override

View File

@ -23,8 +23,6 @@
#include <DataTypes/NestedUtils.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Columns/ColumnArray.h>
#include <Interpreters/Context.h>
@ -37,6 +35,7 @@
#include "StorageLogSettings.h"
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Pipe.h>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
@ -192,14 +191,15 @@ void TinyLogSource::readData(const NameAndTypePair & name_and_type,
}
class TinyLogBlockOutputStream final : public IBlockOutputStream
class TinyLogSink final : public SinkToStorage
{
public:
explicit TinyLogBlockOutputStream(
explicit TinyLogSink(
StorageTinyLog & storage_,
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_), metadata_snapshot(metadata_snapshot_), lock(std::move(lock_))
: SinkToStorage(metadata_snapshot->getSampleBlock())
, storage(storage_), metadata_snapshot(metadata_snapshot_), lock(std::move(lock_))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
@ -213,7 +213,7 @@ public:
}
}
~TinyLogBlockOutputStream() override
~TinyLogSink() override
{
try
{
@ -231,10 +231,10 @@ public:
}
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
String getName() const override { return "TinyLogSink"; }
void write(const Block & block) override;
void writeSuffix() override;
void consume(Chunk chunk) override;
void onFinish() override;
private:
StorageTinyLog & storage;
@ -274,7 +274,7 @@ private:
};
ISerialization::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(
ISerialization::OutputStreamGetter TinyLogSink::createStreamGetter(
const NameAndTypePair & column,
WrittenStreams & written_streams)
{
@ -298,7 +298,7 @@ ISerialization::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(
}
void TinyLogBlockOutputStream::writeData(const NameAndTypePair & name_and_type, const IColumn & column, WrittenStreams & written_streams)
void TinyLogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & column, WrittenStreams & written_streams)
{
ISerialization::SerializeBinaryBulkSettings settings;
const auto & [name, type] = name_and_type;
@ -318,7 +318,7 @@ void TinyLogBlockOutputStream::writeData(const NameAndTypePair & name_and_type,
}
void TinyLogBlockOutputStream::writeSuffix()
void TinyLogSink::onFinish()
{
if (done)
return;
@ -332,7 +332,7 @@ void TinyLogBlockOutputStream::writeSuffix()
WrittenStreams written_streams;
ISerialization::SerializeBinaryBulkSettings settings;
for (const auto & column : getHeader())
for (const auto & column : getPort().getHeader())
{
auto it = serialize_states.find(column.name);
if (it != serialize_states.end())
@ -365,8 +365,9 @@ void TinyLogBlockOutputStream::writeSuffix()
}
void TinyLogBlockOutputStream::write(const Block & block)
void TinyLogSink::consume(Chunk chunk)
{
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
metadata_snapshot->check(block, true);
/// The set of written offset columns so that you do not write shared columns for nested structures multiple times
@ -508,9 +509,9 @@ Pipe StorageTinyLog::read(
}
BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
return std::make_shared<TinyLogBlockOutputStream>(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)});
return std::make_shared<TinyLogSink>(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)});
}

View File

@ -18,7 +18,7 @@ namespace DB
class StorageTinyLog final : public shared_ptr_helper<StorageTinyLog>, public IStorage
{
friend class TinyLogSource;
friend class TinyLogBlockOutputStream;
friend class TinyLogSink;
friend struct shared_ptr_helper<StorageTinyLog>;
public:
@ -33,7 +33,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;

View File

@ -140,14 +140,15 @@ namespace
};
}
StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block_,
ContextPtr context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: sample_block(sample_block_)
StorageURLSink::StorageURLSink(
const Poco::URI & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: SinkToStorage(sample_block)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
@ -157,17 +158,18 @@ StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
}
void StorageURLBlockOutputStream::write(const Block & block)
void StorageURLSink::consume(Chunk chunk)
{
writer->write(block);
if (is_first_chunk)
{
writer->writePrefix();
is_first_chunk = false;
}
writer->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void StorageURLBlockOutputStream::writePrefix()
{
writer->writePrefix();
}
void StorageURLBlockOutputStream::writeSuffix()
void StorageURLSink::onFinish()
{
writer->writeSuffix();
writer->flush();
@ -289,9 +291,9 @@ Pipe StorageURLWithFailover::read(
}
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
return std::make_shared<StorageURLBlockOutputStream>(uri, format_name,
return std::make_shared<StorageURLSink>(uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri.toString(), compression_method));

View File

@ -3,7 +3,7 @@
#include <Storages/IStorage.h>
#include <Poco/URI.h>
#include <common/shared_ptr_helper.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <Storages/StorageFactory.h>
@ -32,7 +32,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
protected:
IStorageURLBase(
@ -77,31 +77,26 @@ private:
virtual Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const = 0;
};
class StorageURLBlockOutputStream : public IBlockOutputStream
class StorageURLSink : public SinkToStorage
{
public:
StorageURLBlockOutputStream(
StorageURLSink(
const Poco::URI & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block_,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method);
Block getHeader() const override
{
return sample_block;
}
void write(const Block & block) override;
void writePrefix() override;
void writeSuffix() override;
void consume(Chunk chunk) override;
void onFinish() override;
private:
Block sample_block;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
bool is_first_chunk = true;
};
class StorageURL : public shared_ptr_helper<StorageURL>, public IStorageURLBase

View File

@ -114,7 +114,7 @@ Pipe StorageXDBC::read(
return IStorageURLBase::read(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
}
BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
bridge_helper->startBridgeSync();
@ -130,7 +130,7 @@ BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageM
request_uri.addQueryParameter("format_name", format_name);
request_uri.addQueryParameter("sample_block", metadata_snapshot->getSampleBlock().getNamesAndTypesList().toString());
return std::make_shared<StorageURLBlockOutputStream>(
return std::make_shared<StorageURLSink>(
request_uri,
format_name,
getFormatSettings(local_context),

View File

@ -33,7 +33,7 @@ public:
ContextPtr context_,
BridgeHelperPtr bridge_helper_);
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
std::string getName() const override;
private: