Fix some resouces.

This commit is contained in:
Nikolai Kochetov 2021-09-10 15:38:30 +03:00
parent 285cfb7fd5
commit e6938127a6
10 changed files with 41 additions and 13 deletions

View File

@ -63,8 +63,8 @@ void CheckConstraintsTransform::transform(Chunk & chunk)
/// Check if constraint value is nullable
const auto & null_map = column_nullable->getNullMapColumn();
const PaddedPODArray<UInt8> & data = null_map.getData();
bool null_map_contains_null = !memoryIsZero(data.raw_data(), data.size() * sizeof(UInt8));
const PaddedPODArray<UInt8> & null_map_data = null_map.getData();
bool null_map_contains_null = !memoryIsZero(null_map_data.raw_data(), null_map_data.size() * sizeof(UInt8));
if (null_map_contains_null)
throw Exception(

View File

@ -406,6 +406,7 @@ Chain buildPushingToViewsDrain(
ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, insert_context, false, false, false, view_runtime_data);
BlockIO io = interpreter.execute();
io.out.attachResources(QueryPipeline::getPipe(std::move(io.pipeline)).detachResources());
out = std::move(io.out);
}
else if (auto * live_view = dynamic_cast<StorageLiveView *>(dependent_table.get()))
@ -473,11 +474,14 @@ Chain buildPushingToViewsDrain(
auto out = copying_data->getOutputs().begin();
auto in = finalizing_views->getInputs().begin();
size_t max_parallel_streams = 0;
std::list<ProcessorPtr> processors;
for (auto & chain : chains)
{
result_chain.attachResourcesFrom(chain);
max_parallel_streams += std::max<size_t>(chain.getNumThreads(), 1);
result_chain.attachResources(chain.detachResources());
connect(*out, chain.getInputPort());
connect(chain.getOutputPort(), *in);
++in;
@ -488,6 +492,7 @@ Chain buildPushingToViewsDrain(
processors.emplace_front(std::move(copying_data));
processors.emplace_back(std::move(finalizing_views));
result_chain = Chain(std::move(processors));
result_chain.setNumThreads(max_parallel_streams);
}
if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get()))

View File

@ -377,7 +377,10 @@ BlockIO InterpreterInsertQuery::execute()
});
}
else
{
res.out = std::move(out_chains.at(0));
res.out.setNumThreads(std::min<size_t>(res.out.getNumThreads(), settings.max_threads));
}
res.pipeline.addStorageHolder(table);
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))
@ -386,6 +389,9 @@ BlockIO InterpreterInsertQuery::execute()
res.pipeline.addStorageHolder(inner_table);
}
if (!res.out.empty())
res.out.attachResources(QueryPipeline::getPipe(std::move(res.pipeline)).detachResources());
return res;
}

View File

@ -21,6 +21,9 @@ public:
bool empty() const { return processors.empty(); }
size_t getNumThreads() const { return num_threads; }
void setNumThreads(size_t num_threads_) { num_threads = num_threads_; }
void addSource(ProcessorPtr processor);
void addSink(ProcessorPtr processor);
@ -37,7 +40,7 @@ public:
static std::list<ProcessorPtr> getProcessors(Chain chain) { return std::move(chain.processors); }
void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); }
void attachResourcesFrom(Chain & other) { holder = std::move(other.holder); }
void attachResources(PipelineResourcesHolder holder_) { holder = std::move(holder_); }
PipelineResourcesHolder detachResources() { return std::move(holder); }
void reset();
@ -48,6 +51,7 @@ private:
/// input port output port
std::list<ProcessorPtr> processors;
PipelineResourcesHolder holder;
size_t num_threads = 0;
};
}

View File

@ -6,6 +6,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
pulling_format = std::make_shared<PullingOutputFormat>(pipeline.getHeader(), has_data_flag);

View File

@ -127,9 +127,7 @@ static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGrou
}
PushingAsyncPipelineExecutor::PushingAsyncPipelineExecutor(Chain & chain_, size_t num_threads_)
: num_threads(num_threads_)
, chain(chain_)
PushingAsyncPipelineExecutor::PushingAsyncPipelineExecutor(Chain & chain_) : chain(chain_)
{
pushing_source = std::make_shared<PushingAsyncSource>(chain.getInputHeader());
auto sink = std::make_shared<ExceptionHandlingSink>(chain.getOutputHeader());
@ -175,7 +173,7 @@ void PushingAsyncPipelineExecutor::start()
auto func = [&, thread_group = CurrentThread::getGroup()]()
{
threadFunction(*data, thread_group, num_threads);
threadFunction(*data, thread_group, chain.getNumThreads());
};
data->thread = ThreadFromGlobalPool(std::move(func));

View File

@ -28,7 +28,7 @@ using Processors = std::vector<ProcessorPtr>;
class PushingAsyncPipelineExecutor
{
public:
explicit PushingAsyncPipelineExecutor(Chain & chain, size_t num_threads_);
explicit PushingAsyncPipelineExecutor(Chain & chain);
~PushingAsyncPipelineExecutor();
/// Get structure of returned block or chunk.
@ -47,8 +47,6 @@ public:
struct Data;
private:
size_t num_threads;
Chain & chain;
std::shared_ptr<PushingAsyncSource> pushing_source;

View File

@ -17,7 +17,7 @@ public:
, need_data_flag(need_data_flag_)
{}
String getName() const override { return "PullingOutputFormat"; }
String getName() const override { return "PushingSource"; }
void setData(Chunk chunk)
{
@ -70,7 +70,7 @@ PushingPipelineExecutor::~PushingPipelineExecutor()
}
catch (...)
{
tryLogCurrentException("PullingPipelineExecutor");
tryLogCurrentException("PushingPipelineExecutor");
}
}

View File

@ -104,6 +104,11 @@ void Pipe::addQueryPlan(std::unique_ptr<QueryPlan> plan)
holder.query_plans.emplace_back(std::move(plan));
}
PipelineResourcesHolder Pipe::detachResources()
{
return std::move(holder);
}
Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, OutputPort * extremes)
{
if (!source->getInputs().empty())
@ -670,9 +675,13 @@ void Pipe::addChains(std::vector<Chain> chains)
dropTotals();
dropExtremes();
size_t max_parallel_streams_for_chains = 0;
Block new_header;
for (size_t i = 0; i < output_ports.size(); ++i)
{
max_parallel_streams_for_chains += std::max<size_t>(chains[i].getNumThreads(), 1);
if (i == 0)
new_header = chains[i].getOutputHeader();
else
@ -693,6 +702,7 @@ void Pipe::addChains(std::vector<Chain> chains)
}
header = std::move(new_header);
max_parallel_streams = std::max(max_parallel_streams, max_parallel_streams_for_chains);
}
void Pipe::resize(size_t num_streams, bool force, bool strict)

View File

@ -115,6 +115,8 @@ public:
/// For queries with nested interpreters (i.e. StorageDistributed)
void addQueryPlan(std::unique_ptr<QueryPlan> plan);
PipelineResourcesHolder detachResources();
private:
/// Destruction order: processors, header, locks, temporary storages, local contexts
PipelineResourcesHolder holder;