Add a test.

This commit is contained in:
Nikolai Kochetov 2022-05-31 17:56:48 +00:00
parent 147a819221
commit 32010f0ba8
5 changed files with 43 additions and 2 deletions

View File

@ -10,7 +10,6 @@ class ISource : public IProcessor
{
private:
ReadProgressCounters read_progress;
std::shared_ptr<const StorageLimitsList> storage_limits;
bool read_progress_was_set = false;
bool auto_progress;
@ -21,6 +20,8 @@ protected:
bool got_exception = false;
Port::Data current_chunk;
std::shared_ptr<const StorageLimitsList> storage_limits;
virtual Chunk generate();
virtual std::optional<Chunk> tryGenerate();
@ -36,7 +37,7 @@ public:
OutputPort & getPort() { return output; }
const OutputPort & getPort() const { return output; }
void setStorageLimits(const std::shared_ptr<const StorageLimitsList> & storage_limits_) final;
void setStorageLimits(const std::shared_ptr<const StorageLimitsList> & storage_limits_) override;
/// Default implementation for all the sources.
std::optional<ReadProgress> getReadProgress() final;

View File

@ -1,6 +1,7 @@
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
#include <QueryPipeline/StreamLocalLimits.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
@ -21,6 +22,16 @@ RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation
RemoteSource::~RemoteSource() = default;
void RemoteSource::setStorageLimits(const std::shared_ptr<const StorageLimitsList> & storage_limits_)
{
/// Remove leaf limits for remote source.
StorageLimitsList list;
for (const auto & value : *storage_limits_)
list.emplace_back(StorageLimits{value.local_limits, {}});
storage_limits = std::make_shared<const StorageLimitsList>(std::move(list));
}
ISource::Status RemoteSource::prepare()
{
/// Check if query was cancelled before returning Async status. Otherwise it may lead to infinite loop.

View File

@ -33,6 +33,8 @@ public:
int schedule() override { return fd; }
void setStorageLimits(const std::shared_ptr<const StorageLimitsList> & storage_limits_) override;
protected:
std::optional<Chunk> tryGenerate() override;
void onCancel() override;

View File

@ -0,0 +1,2 @@
45
45

View File

@ -0,0 +1,25 @@
SELECT sum(x)
FROM
(
SELECT x
FROM
(
SELECT number AS x
FROM system.numbers
SETTINGS max_rows_to_read = 10, read_overflow_mode = 'break', max_block_size = 2
)
SETTINGS max_rows_to_read = 20, read_overflow_mode = 'break', max_block_size = 2
);
SELECT sum(x)
FROM
(
SELECT x
FROM
(
SELECT number AS x
FROM system.numbers
SETTINGS max_rows_to_read = 20, read_overflow_mode = 'break', max_block_size = 2
)
SETTINGS max_rows_to_read = 10, read_overflow_mode = 'break', max_block_size = 2
);