2021-03-20 14:13:57 +00:00
|
|
|
#include <Storages/ReadFinalForExternalReplicaStorage.h>
|
|
|
|
|
|
|
|
#if USE_MYSQL || USE_LIBPQXX
|
|
|
|
|
|
|
|
#include <Interpreters/ExpressionAnalyzer.h>
|
|
|
|
#include <Interpreters/TreeRewriter.h>
|
|
|
|
#include <Parsers/ASTFunction.h>
|
|
|
|
#include <Parsers/ASTSelectQuery.h>
|
|
|
|
#include <Parsers/ASTTablesInSelectQuery.h>
|
|
|
|
#include <Parsers/ASTLiteral.h>
|
|
|
|
#include <Parsers/ASTIdentifier.h>
|
|
|
|
#include <Processors/Transforms/FilterTransform.h>
|
|
|
|
#include <Interpreters/Context.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-07-05 19:07:56 +00:00
|
|
|
bool needRewriteQueryWithFinalForStorage(const Names & column_names, const StoragePtr & storage)
|
|
|
|
{
|
|
|
|
const StorageMetadataPtr & metadata = storage->getInMemoryMetadataPtr();
|
|
|
|
Block header = metadata->getSampleBlock();
|
|
|
|
ColumnWithTypeAndName & version_column = header.getByPosition(header.columns() - 1);
|
|
|
|
return std::find(column_names.begin(), column_names.end(), version_column.name) == column_names.end();
|
|
|
|
}
|
|
|
|
|
2021-03-20 14:13:57 +00:00
|
|
|
Pipe readFinalFromNestedStorage(
|
|
|
|
StoragePtr nested_storage,
|
|
|
|
const Names & column_names,
|
|
|
|
const StorageMetadataPtr & /*metadata_snapshot*/,
|
|
|
|
SelectQueryInfo & query_info,
|
2021-04-11 07:44:40 +00:00
|
|
|
ContextPtr context,
|
2021-03-20 14:13:57 +00:00
|
|
|
QueryProcessingStage::Enum processed_stage,
|
|
|
|
size_t max_block_size,
|
|
|
|
unsigned int num_streams)
|
|
|
|
{
|
|
|
|
NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
|
2021-04-11 07:44:40 +00:00
|
|
|
auto lock = nested_storage->lockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout);
|
2021-03-20 14:13:57 +00:00
|
|
|
const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr();
|
|
|
|
|
|
|
|
Block nested_header = nested_metadata->getSampleBlock();
|
|
|
|
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
|
|
|
|
|
|
|
|
String filter_column_name;
|
|
|
|
Names require_columns_name = column_names;
|
|
|
|
ASTPtr expressions = std::make_shared<ASTExpressionList>();
|
|
|
|
if (column_names_set.empty() || !column_names_set.count(sign_column.name))
|
|
|
|
{
|
|
|
|
require_columns_name.emplace_back(sign_column.name);
|
|
|
|
|
|
|
|
const auto & sign_column_name = std::make_shared<ASTIdentifier>(sign_column.name);
|
|
|
|
const auto & fetch_sign_value = std::make_shared<ASTLiteral>(Field(Int8(1)));
|
|
|
|
|
|
|
|
expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
|
|
|
|
filter_column_name = expressions->children.back()->getColumnName();
|
|
|
|
}
|
|
|
|
|
|
|
|
Pipe pipe = nested_storage->read(require_columns_name, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);
|
|
|
|
pipe.addTableLock(lock);
|
|
|
|
|
|
|
|
if (!expressions->children.empty() && !pipe.empty())
|
|
|
|
{
|
|
|
|
Block pipe_header = pipe.getHeader();
|
|
|
|
auto syntax = TreeRewriter(context).analyze(expressions, pipe_header.getNamesAndTypesList());
|
2021-05-02 11:53:20 +00:00
|
|
|
ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true /* add_aliases */, false /* project_result */);
|
2021-03-20 14:13:57 +00:00
|
|
|
|
|
|
|
pipe.addSimpleTransform([&](const Block & header)
|
|
|
|
{
|
|
|
|
return std::make_shared<FilterTransform>(header, expression_actions, filter_column_name, false);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
return pipe;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif
|