Merge pull request #3208 from vavrusa/master

PushingToViewsBlockOutputStream: process blocks concurrently
This commit is contained in:
alexey-milovidov 2018-10-01 04:42:55 +03:00 committed by GitHub
commit a4736275c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 26 deletions

View File

@ -2,9 +2,12 @@
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <common/ThreadPool.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
namespace DB
{
@ -73,35 +76,30 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
if (replicated_output && replicated_output->lastBlockIsDuplicate())
return;
/// Insert data into materialized views only after successful insert into main table
for (auto & view : views)
// Insert data into materialized views only after successful insert into main table
bool allow_concurrent_view_processing = context.getSettingsRef().allow_concurrent_view_processing;
if (allow_concurrent_view_processing && views.size() > 1)
{
try
// Push to views concurrently if enabled, and more than one view is attached
ThreadPool pool(std::min(getNumberOfPhysicalCPUCores(), views.size()));
for (size_t view_num = 0; view_num < views.size(); ++view_num)
{
BlockInputStreamPtr from = std::make_shared<OneBlockInputStream>(block);
InterpreterSelectQuery select(view.query, *views_context, from);
BlockInputStreamPtr in = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
in->readPrefix();
while (Block result_block = in->read())
auto thread_group = CurrentThread::getGroup();
pool.schedule([=] ()
{
Nested::validateArraySizes(result_block);
view.out->write(result_block);
}
in->readSuffix();
}
catch (Exception & ex)
{
ex.addMessage("while pushing to view " + view.database + "." + view.table);
throw;
setThreadName("PushingToViewsBlockOutputStream");
CurrentThread::attachToIfDetached(thread_group);
process(block, view_num);
});
}
// Wait for concurrent view processing
pool.wait();
}
else
{
// Process sequentially
for (size_t view_num = 0; view_num < views.size(); ++view_num)
process(block, view_num);
}
}
@ -152,4 +150,36 @@ void PushingToViewsBlockOutputStream::flush()
view.out->flush();
}
void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_num)
{
auto & view = views[view_num];
try
{
BlockInputStreamPtr from = std::make_shared<OneBlockInputStream>(block);
InterpreterSelectQuery select(view.query, *views_context, from);
BlockInputStreamPtr in = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
in->readPrefix();
while (Block result_block = in->read())
{
Nested::validateArraySizes(result_block);
view.out->write(result_block);
}
in->readSuffix();
}
catch (Exception & ex)
{
ex.addMessage("while pushing to view " + backQuoteIfNeed(view.database) + "." + backQuoteIfNeed(view.table));
throw;
}
}
}

View File

@ -47,6 +47,8 @@ private:
std::vector<ViewInfo> views;
std::unique_ptr<Context> views_context;
void process(const Block & block, size_t view_num);
};

View File

@ -291,6 +291,7 @@ struct Settings
M(SettingUInt64, http_max_multipart_form_data_size, 1024 * 1024 * 1024, "Limit on size of multipart/form-data content. This setting cannot be parsed from URL parameters and should be set in user profile. Note that content is parsed and external tables are created in memory before start of query execution. And this is the only limit that has effect on that stage (limits on max memory usage and max execution time have no effect while reading HTTP form data).") \
M(SettingBool, calculate_text_stack_trace, 1, "Calculate text stack trace in case of exceptions during query execution. This is the default. It requires symbol lookups that may slow down fuzzing tests when huge amount of wrong queries are executed. In normal cases you should not disable this option.") \
M(SettingBool, allow_ddl, true, "If it is set to true, then a user is allowed to executed DDL queries.") \
M(SettingBool, allow_concurrent_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \