ClickHouse/src/Storages/ReadFinalForExternalReplicaStorage.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

93 lines
3.5 KiB
C++
Raw Normal View History

#include <Storages/ReadFinalForExternalReplicaStorage.h>
#if USE_MYSQL || USE_LIBPQXX
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/Transforms/FilterTransform.h>
2022-05-20 19:49:31 +00:00
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/Context.h>
2022-06-01 15:21:47 +00:00
#include <Interpreters/InterpreterSelectQuery.h>
namespace DB
{
bool needRewriteQueryWithFinalForStorage(const Names & column_names, const StoragePtr & storage)
{
const StorageMetadataPtr & metadata = storage->getInMemoryMetadataPtr();
Block header = metadata->getSampleBlock();
ColumnWithTypeAndName & version_column = header.getByPosition(header.columns() - 1);
return std::find(column_names.begin(), column_names.end(), version_column.name) == column_names.end();
}
2022-05-20 19:49:31 +00:00
void readFinalFromNestedStorage(
QueryPlan & query_plan,
StoragePtr nested_storage,
const Names & column_names,
SelectQueryInfo & query_info,
2021-04-11 07:44:40 +00:00
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
2021-04-11 07:44:40 +00:00
auto lock = nested_storage->lockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout);
const auto & nested_metadata = nested_storage->getInMemoryMetadataPtr();
Block nested_header = nested_metadata->getSampleBlock();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
String filter_column_name;
Names require_columns_name = column_names;
ASTPtr expressions = std::make_shared<ASTExpressionList>();
if (column_names_set.empty() || !column_names_set.contains(sign_column.name))
{
require_columns_name.emplace_back(sign_column.name);
const auto & sign_column_name = std::make_shared<ASTIdentifier>(sign_column.name);
const auto & fetch_sign_value = std::make_shared<ASTLiteral>(Field(static_cast<Int8>(1)));
expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
filter_column_name = expressions->children.back()->getColumnName();
}
auto nested_snapshot = nested_storage->getStorageSnapshot(nested_metadata, context);
nested_storage->read(
query_plan, require_columns_name, nested_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
2022-06-01 15:21:47 +00:00
if (!query_plan.isInitialized())
{
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, nested_header, query_info, context);
return;
}
2022-05-20 19:49:31 +00:00
query_plan.addTableLock(lock);
query_plan.addStorageHolder(nested_storage);
2022-05-20 19:49:31 +00:00
if (!expressions->children.empty())
{
2022-05-20 19:49:31 +00:00
const auto & header = query_plan.getCurrentDataStream().header;
auto syntax = TreeRewriter(context).analyze(expressions, header.getNamesAndTypesList());
auto actions = ExpressionAnalyzer(expressions, syntax, context).getActionsDAG(true /* add_aliases */, false /* project_result */);
2022-05-20 19:49:31 +00:00
auto step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
actions,
filter_column_name,
false);
2022-05-20 19:49:31 +00:00
step->setStepDescription("Filter columns");
query_plan.addStep(std::move(step));
}
}
}
#endif