diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp deleted file mode 100644 index 76c378c07e0..00000000000 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ /dev/null @@ -1,401 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -namespace DB -{ - -PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( - const StoragePtr & storage_, - const StorageMetadataPtr & metadata_snapshot_, - ContextPtr context_, - const ASTPtr & query_ptr_, - bool no_destination) - : WithContext(context_) - , storage(storage_) - , metadata_snapshot(metadata_snapshot_) - , log(&Poco::Logger::get("PushingToViewsBlockOutputStream")) - , query_ptr(query_ptr_) -{ - checkStackSize(); - - /** TODO This is a very important line. At any insertion into the table one of streams should own lock. - * Although now any insertion into the table is done via PushingToViewsBlockOutputStream, - * but it's clear that here is not the best place for this functionality. - */ - addTableLock( - storage->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout)); - - /// If the "root" table deduplicates blocks, there are no need to make deduplication for children - /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks - bool disable_deduplication_for_children = false; - if (!getContext()->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views) - disable_deduplication_for_children = !no_destination && storage->supportsDeduplication(); - - auto table_id = storage->getStorageID(); - Dependencies dependencies = DatabaseCatalog::instance().getDependencies(table_id); - - /// We need special context for materialized views insertions - if (!dependencies.empty()) - { - select_context = Context::createCopy(context); - insert_context = Context::createCopy(context); - - const auto & insert_settings = insert_context->getSettingsRef(); - - // Do not deduplicate insertions into MV if the main insertion is Ok - if (disable_deduplication_for_children) - insert_context->setSetting("insert_deduplicate", Field{false}); - - // Separate min_insert_block_size_rows/min_insert_block_size_bytes for children - if (insert_settings.min_insert_block_size_rows_for_materialized_views) - insert_context->setSetting("min_insert_block_size_rows", insert_settings.min_insert_block_size_rows_for_materialized_views.value); - if (insert_settings.min_insert_block_size_bytes_for_materialized_views) - insert_context->setSetting("min_insert_block_size_bytes", insert_settings.min_insert_block_size_bytes_for_materialized_views.value); - } - - for (const auto & database_table : dependencies) - { - auto dependent_table = DatabaseCatalog::instance().getTable(database_table, getContext()); - auto dependent_metadata_snapshot = dependent_table->getInMemoryMetadataPtr(); - - ASTPtr query; - BlockOutputStreamPtr out; - - if (auto * materialized_view = dynamic_cast(dependent_table.get())) - { - addTableLock( - materialized_view->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout)); - - StoragePtr inner_table = materialized_view->getTargetTable(); - auto inner_table_id = inner_table->getStorageID(); - auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr(); - query = dependent_metadata_snapshot->getSelectQuery().inner_query; - - std::unique_ptr insert = std::make_unique(); - insert->table_id = inner_table_id; - - /// Get list of columns we get from select query. - auto header = InterpreterSelectQuery(query, select_context, SelectQueryOptions().analyze()) - .getSampleBlock(); - - /// Insert only columns returned by select. - auto list = std::make_shared(); - const auto & inner_table_columns = inner_metadata_snapshot->getColumns(); - for (const auto & column : header) - { - /// But skip columns which storage doesn't have. - if (inner_table_columns.hasPhysical(column.name)) - list->children.emplace_back(std::make_shared(column.name)); - } - - insert->columns = std::move(list); - - ASTPtr insert_query_ptr(insert.release()); - InterpreterInsertQuery interpreter(insert_query_ptr, insert_context); - BlockIO io = interpreter.execute(); - out = io.out; - } - else if ( - dynamic_cast(dependent_table.get()) || dynamic_cast(dependent_table.get())) - out = std::make_shared( - dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true); - else - out = std::make_shared( - dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr()); - - views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, 0 /* elapsed_ms */}); - } - - /// Do not push to destination table if the flag is set - if (!no_destination) - { - output = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), getContext()); - replicated_output = dynamic_cast(output.get()); - } -} - - -Block PushingToViewsBlockOutputStream::getHeader() const -{ - /// If we don't write directly to the destination - /// then expect that we're inserting with precalculated virtual columns - if (output) - return metadata_snapshot->getSampleBlock(); - else - return metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals()); -} - - -void PushingToViewsBlockOutputStream::write(const Block & block) -{ - /** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match. - * We have to make this assertion before writing to table, because storage engine may assume that they have equal sizes. - * NOTE It'd better to do this check in serialization of nested structures (in place when this assumption is required), - * but currently we don't have methods for serialization of nested structures "as a whole". - */ - Nested::validateArraySizes(block); - - if (auto * live_view = dynamic_cast(storage.get())) - { - StorageLiveView::writeIntoLiveView(*live_view, block, getContext()); - } - else if (auto * window_view = dynamic_cast(storage.get())) - { - StorageWindowView::writeIntoWindowView(*window_view, block, getContext()); - } - else - { - if (output) - /// TODO: to support virtual and alias columns inside MVs, we should return here the inserted block extended - /// with additional columns directly from storage and pass it to MVs instead of raw block. - output->write(block); - } - - /// Don't process materialized views if this block is duplicate - if (!getContext()->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate()) - return; - - // Insert data into materialized views only after successful insert into main table - const Settings & settings = getContext()->getSettingsRef(); - if (settings.parallel_view_processing && views.size() > 1) - { - // Push to views concurrently if enabled and more than one view is attached - ThreadPool pool(std::min(size_t(settings.max_threads), views.size())); - for (auto & view : views) - { - auto thread_group = CurrentThread::getGroup(); - pool.scheduleOrThrowOnError([=, &view, this] - { - setThreadName("PushingToViews"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); - process(block, view); - }); - } - // Wait for concurrent view processing - pool.wait(); - } - else - { - // Process sequentially - for (auto & view : views) - { - process(block, view); - - if (view.exception) - std::rethrow_exception(view.exception); - } - } -} - -void PushingToViewsBlockOutputStream::writePrefix() -{ - if (output) - output->writePrefix(); - - for (auto & view : views) - { - try - { - view.out->writePrefix(); - } - catch (Exception & ex) - { - ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs()); - throw; - } - } -} - -void PushingToViewsBlockOutputStream::writeSuffix() -{ - if (output) - output->writeSuffix(); - - std::exception_ptr first_exception; - - const Settings & settings = getContext()->getSettingsRef(); - bool parallel_processing = false; - - /// Run writeSuffix() for views in separate thread pool. - /// In could have been done in PushingToViewsBlockOutputStream::process, however - /// it is not good if insert into main table fail but into view succeed. - if (settings.parallel_view_processing && views.size() > 1) - { - parallel_processing = true; - - // Push to views concurrently if enabled and more than one view is attached - ThreadPool pool(std::min(size_t(settings.max_threads), views.size())); - auto thread_group = CurrentThread::getGroup(); - - for (auto & view : views) - { - if (view.exception) - continue; - - pool.scheduleOrThrowOnError([thread_group, &view, this] - { - setThreadName("PushingToViews"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); - - Stopwatch watch; - try - { - view.out->writeSuffix(); - } - catch (...) - { - view.exception = std::current_exception(); - } - view.elapsed_ms += watch.elapsedMilliseconds(); - - LOG_TRACE(log, "Pushing from {} to {} took {} ms.", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.elapsed_ms); - }); - } - // Wait for concurrent view processing - pool.wait(); - } - - for (auto & view : views) - { - if (view.exception) - { - if (!first_exception) - first_exception = view.exception; - - continue; - } - - if (parallel_processing) - continue; - - Stopwatch watch; - try - { - view.out->writeSuffix(); - } - catch (Exception & ex) - { - ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs()); - throw; - } - view.elapsed_ms += watch.elapsedMilliseconds(); - - LOG_TRACE(log, "Pushing from {} to {} took {} ms.", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.elapsed_ms); - } - - if (first_exception) - std::rethrow_exception(first_exception); - - UInt64 milliseconds = main_watch.elapsedMilliseconds(); - if (views.size() > 1) - { - LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.", - storage->getStorageID().getNameForLogs(), views.size(), - milliseconds); - } -} - -void PushingToViewsBlockOutputStream::flush() -{ - if (output) - output->flush(); - - for (auto & view : views) - view.out->flush(); -} - -void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view) -{ - Stopwatch watch; - - try - { - BlockInputStreamPtr in; - - /// We need keep InterpreterSelectQuery, until the processing will be finished, since: - /// - /// - We copy Context inside InterpreterSelectQuery to support - /// modification of context (Settings) for subqueries - /// - InterpreterSelectQuery lives shorter than query pipeline. - /// It's used just to build the query pipeline and no longer needed - /// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery, - /// **can** take a reference to Context from InterpreterSelectQuery - /// (the problem raises only when function uses context from the - /// execute*() method, like FunctionDictGet do) - /// - These objects live inside query pipeline (DataStreams) and the reference become dangling. - std::optional select; - - if (view.query) - { - /// We create a table with the same name as original table and the same alias columns, - /// but it will contain single block (that is INSERT-ed into main table). - /// InterpreterSelectQuery will do processing of alias columns. - - auto local_context = Context::createCopy(select_context); - local_context->addViewSource( - StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals())); - select.emplace(view.query, local_context, SelectQueryOptions()); - in = std::make_shared(select->execute().getInputStream()); - - /// Squashing is needed here because the materialized view query can generate a lot of blocks - /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY - /// and two-level aggregation is triggered). - in = std::make_shared( - in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes); - in = std::make_shared(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name); - } - else - in = std::make_shared(block); - - in->readPrefix(); - - while (Block result_block = in->read()) - { - Nested::validateArraySizes(result_block); - view.out->write(result_block); - } - - in->readSuffix(); - } - catch (Exception & ex) - { - ex.addMessage("while pushing to view " + view.table_id.getNameForLogs()); - view.exception = std::current_exception(); - } - catch (...) - { - view.exception = std::current_exception(); - } - - view.elapsed_ms += watch.elapsedMilliseconds(); -} - -}