Fix more tests.

This commit is contained in:
Nikolai Kochetov 2021-07-26 17:47:29 +03:00
parent fa1c223269
commit 0eb563dc1b
15 changed files with 44 additions and 55 deletions

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <iostream>
namespace DB
{
@ -19,17 +19,21 @@ public:
if (!port.isConnected())
writePrefix();
if (!block)
return;
size_t num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
port.push(std::move(chunk));
while (true)
{
switch (auto status = sink->prepare())
auto status = sink->prepare();
switch (status)
{
case IProcessor::Status::Ready:
sink->work();
break;
continue;
case IProcessor::Status::NeedData:
return;
case IProcessor::Status::Async: [[fallthrough]];
@ -50,11 +54,12 @@ public:
while (true)
{
switch (auto status = sink->prepare())
auto status = sink->prepare();
switch (status)
{
case IProcessor::Status::Ready:
sink->work();
break;
continue;
case IProcessor::Status::NeedData:
return;
case IProcessor::Status::Async: [[fallthrough]];
@ -71,15 +76,18 @@ public:
void writeSuffix() override
{
sink->getPort().close();
port.finish();
while (true)
{
switch (auto status = sink->prepare())
auto status = sink->prepare();
switch (status)
{
case IProcessor::Status::Ready:
sink->work();
break;
continue;
case IProcessor::Status::Finished:
///flush();
return;
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:

View File

@ -128,7 +128,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
if (!no_destination)
{
auto sink = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), getContext());
replicated_output = dynamic_cast<ReplicatedMergeTreeSink *>(output.get());
replicated_output = dynamic_cast<ReplicatedMergeTreeSink *>(sink.get());
output = std::make_shared<PushingToSinkBlockOutputStream>(std::move(sink));
}
}

View File

@ -11,6 +11,9 @@ ISink::ISink(Block header)
ISink::Status ISink::prepare()
{
if (!was_on_start_called)
return Status::Ready;
if (has_input)
return Status::Ready;
@ -33,15 +36,20 @@ ISink::Status ISink::prepare()
void ISink::work()
{
if (has_input)
if (!was_on_start_called)
{
consume(std::move(current_chunk));
has_input = false;
was_on_start_called = true;
onStart();
}
else
else if (has_input)
{
has_input = false;
consume(std::move(current_chunk));
}
else if (!was_on_finish_called)
{
onFinish();
was_on_finish_called = true;
onFinish();
}
}

View File

@ -12,10 +12,11 @@ protected:
InputPort & input;
Chunk current_chunk;
bool has_input = false;
bool was_on_start_called = false;
bool was_on_finish_called = false;
virtual void consume(Chunk block) = 0;
virtual void onStart() {}
virtual void onFinish() {}
public:

View File

@ -95,10 +95,7 @@ DistributedSink::DistributedSink(
bool insert_sync_,
UInt64 insert_timeout_,
StorageID main_table_)
: SinkToStorage(
context_->getSettingsRef().insert_allow_materialized_columns
? metadata_snapshot_->getSampleBlock()
: metadata_snapshot_->getSampleBlockNonMaterialized())
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, context(Context::createCopy(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)

View File

@ -35,11 +35,6 @@ void KafkaSink::onStart()
void KafkaSink::consume(Chunk chunk)
{
if (is_first_chunk)
{
onStart();
is_first_chunk = false;
}
child->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}

View File

@ -15,7 +15,7 @@ public:
const std::shared_ptr<const Context> & context_);
void consume(Chunk chunk) override;
void onStart();
void onStart() override;
void onFinish() override;
String getName() const override { return "KafkaSink"; }
@ -27,8 +27,6 @@ private:
const std::shared_ptr<const Context> context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
bool is_first_chunk = true;
};
}

View File

@ -17,11 +17,6 @@ void MergeTreeSink::onStart()
void MergeTreeSink::consume(Chunk chunk)
{
if (is_first_chunk)
{
onStart();
is_first_chunk = false;
}
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);

View File

@ -29,14 +29,13 @@ public:
String getName() const override { return "MergeTreeSink"; }
void consume(Chunk chunk) override;
void onStart();
void onStart() override;
private:
StorageMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
ContextPtr context;
bool is_first_chunk = true;
};
}

View File

@ -118,12 +118,6 @@ void ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zoo
void ReplicatedMergeTreeSink::consume(Chunk chunk)
{
if (is_first_chunk)
{
onStart();
is_first_chunk = false;
}
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
last_block_is_duplicate = false;

View File

@ -35,7 +35,7 @@ public:
// needed to set the special LogEntryType::ATTACH_PART
bool is_attach_ = false);
void onStart();
void onStart() override;
void consume(Chunk chunk) override;
String getName() const override { return "ReplicatedMergeTreeSink"; }
@ -80,7 +80,6 @@ private:
bool quorum_parallel = false;
bool deduplicate = true;
bool last_block_is_duplicate = false;
bool is_first_chunk = true;
using Logger = Poco::Logger;
Poco::Logger * log;

View File

@ -43,12 +43,6 @@ void RabbitMQSink::onStart()
void RabbitMQSink::consume(Chunk chunk)
{
if (is_first_chunk)
{
onStart();
is_first_chunk = false;
}
child->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}

View File

@ -13,7 +13,7 @@ class RabbitMQSink : public SinkToStorage
public:
explicit RabbitMQSink(StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, ContextPtr context_);
void onStart();
void onStart() override;
void consume(Chunk chunk) override;
void onFinish() override;
@ -25,7 +25,5 @@ private:
ContextPtr context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
bool is_first_chunk = true;
};
}

View File

@ -516,7 +516,7 @@ Pipe StorageFile::read(
}
class StorageFileSink : public SinkToStorage
class StorageFileSink final : public SinkToStorage
{
public:
explicit StorageFileSink(
@ -566,12 +566,15 @@ public:
String getName() const override { return "StorageFileSink"; }
void consume(Chunk chunk) override
void onStart() override
{
if (!prefix_written)
writer->writePrefix();
prefix_written = true;
}
void consume(Chunk chunk) override
{
writer->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}

View File

@ -24,8 +24,8 @@ SET allow_experimental_bigint_types=1;
CREATE TABLE dist (n Int128) ENGINE=Distributed(test_cluster_two_shards, currentDatabase(), mv);
INSERT INTO src SELECT number, toString(number) FROM numbers(1000);
INSERT INTO mv SELECT toString(number + 1000) FROM numbers(1000); -- { serverError 53 }
INSERT INTO mv SELECT arrayJoin(['42', 'test']); -- { serverError 53 }
INSERT INTO mv SELECT toString(number + 1000) FROM numbers(1000); -- { serverError 49 }
INSERT INTO mv SELECT arrayJoin(['42', 'test']); -- { serverError 49 }
SELECT count(), sum(n), sum(toInt64(s)), max(n), min(n) FROM src;
SELECT count(), sum(n), sum(toInt64(s)), max(n), min(n) FROM dst;