Fix some problems introduced by refactoring.

This commit is contained in:
Amos Bird 2020-10-12 17:58:09 +08:00
parent d2dcfc3f0d
commit 47fcd8bffb
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
5 changed files with 95 additions and 4 deletions

View File

@ -8,7 +8,7 @@
#include <common/logger_useful.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>

View File

@ -0,0 +1,62 @@
#include <random>
#include <Common/thread_local_rng.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Pipe.h>
#include "narrowBlockInputStreams.h"
namespace DB
{
namespace
{
using Distribution = std::vector<size_t>;
Distribution getDistribution(size_t from, size_t to)
{
Distribution distribution(from);
for (size_t i = 0; i < from; ++i)
distribution[i] = i % to;
std::shuffle(distribution.begin(), distribution.end(), thread_local_rng);
return distribution;
}
}
void narrowPipe(Pipe & pipe, size_t width)
{
size_t size = pipe.numOutputPorts();
if (size <= width)
return;
std::vector<std::vector<OutputPort *>> partitions(width);
auto distribution = getDistribution(size, width);
pipe.transform([&](OutputPortRawPtrs ports)
{
for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].emplace_back(ports[i]);
Processors concats;
concats.reserve(width);
for (size_t i = 0; i < width; ++i)
{
auto concat = std::make_shared<ConcatProcessor>(partitions[i].at(0)->getHeader(),
partitions[i].size());
size_t next_port = 0;
for (auto & port : concat->getInputs())
{
connect(*partitions[i][next_port], port);
++next_port;
}
concats.emplace_back(std::move(concat));
}
return concats;
});
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class Pipe;
/** If the number of sources of `inputs` is greater than `width`,
* then glues the sources to each other (using ConcatBlockInputStream),
* so that the number of sources becomes no more than `width`.
*
* Trying to glue the sources with each other uniformly randomly.
* (to avoid overweighting if the distribution of the amount of data in different sources is subject to some pattern)
*/
void narrowPipe(Pipe & pipe, size_t width);
}

View File

@ -1,3 +1,4 @@
#include <DataStreams/narrowBlockInputStreams.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <Storages/StorageMerge.h>
@ -21,6 +22,7 @@
#include <algorithm>
#include <Parsers/queryToString.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Transforms/AddingConstColumnTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
@ -265,7 +267,9 @@ Pipe StorageMerge::read(
auto pipe = Pipe::unitePipes(std::move(pipes));
if (!pipe.empty())
pipe.resize(num_streams);
// It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time.
// Using narrowPipe instead.
narrowPipe(pipe, num_streams);
return pipe;
}
@ -339,7 +343,9 @@ Pipe StorageMerge::createSources(
if (!pipe.empty())
{
if (concat_streams && pipe.numOutputPorts() > 1)
pipe.resize(1);
// It's possible to have many tables read from merge, resize(1) might open too many files at the same time.
// Using concat instead.
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
if (has_table_virtual_column)
{

View File

@ -20,6 +20,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/narrowBlockInputStreams.h>
#include <DataTypes/DataTypeString.h>
@ -319,7 +320,9 @@ Pipe StorageS3::read(
key));
auto pipe = Pipe::unitePipes(std::move(pipes));
pipe.resize(num_streams);
// It's possible to have many buckets read from s3, resize(num_streams) might open too many handles at the same time.
// Using narrowPipe instead.
narrowPipe(pipe, num_streams);
return pipe;
}