ISSUES-4006 filter sign = -1 rows

This commit is contained in:
zhang2014 2020-06-24 13:28:27 +08:00
parent 9324c0ee49
commit a9e1794bd3
3 changed files with 52 additions and 18 deletions

View File

@ -310,8 +310,8 @@ static inline size_t onUpdateData(const std::vector<Field> & rows_data, Block &
std::vector<bool> difference_sorting_keys_mark(rows_data.size() / 2); std::vector<bool> difference_sorting_keys_mark(rows_data.size() / 2);
for (size_t index = 0; index < rows_data.size(); index += 2) for (size_t index = 0; index < rows_data.size(); index += 2)
difference_sorting_keys_mark.emplace_back(differenceSortingKeys( difference_sorting_keys_mark[index / 2] = differenceSortingKeys(
DB::get<const Tuple &>(rows_data[index]), DB::get<const Tuple &>(rows_data[index + 1]), sorting_columns_index)); DB::get<const Tuple &>(rows_data[index]), DB::get<const Tuple &>(rows_data[index + 1]), sorting_columns_index);
for (size_t column = 0; column < buffer.columns() - 2; ++column) for (size_t column = 0; column < buffer.columns() - 2; ++column)
{ {
@ -447,6 +447,7 @@ MaterializeMySQLSyncThread::Buffers::BufferAndSortingColumnsPtr MaterializeMySQL
for (const auto & required_name_for_sorting_key : required_for_sorting_key) for (const auto & required_name_for_sorting_key : required_for_sorting_key)
buffer_and_soring_columns->second.emplace_back( buffer_and_soring_columns->second.emplace_back(
buffer_and_soring_columns->first.getPositionByName(required_name_for_sorting_key)); buffer_and_soring_columns->first.getPositionByName(required_name_for_sorting_key));
} }
return buffer_and_soring_columns; return buffer_and_soring_columns;

View File

@ -35,7 +35,13 @@ Pipes StorageMaterializeMySQL::read(
size_t max_block_size, size_t max_block_size,
unsigned int num_streams) unsigned int num_streams)
{ {
if (ASTSelectQuery * select_query = query_info.query->as<ASTSelectQuery>()) NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
Block nested_header = nested_storage->getSampleBlockNonMaterialized();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
if (ASTSelectQuery * select_query = query_info.query->as<ASTSelectQuery>(); select_query && !column_names_set.count(version_column.name))
{ {
auto & tables_in_select_query = select_query->tables()->as<ASTTablesInSelectQuery &>(); auto & tables_in_select_query = select_query->tables()->as<ASTTablesInSelectQuery &>();
@ -48,28 +54,53 @@ Pipes StorageMaterializeMySQL::read(
} }
} }
String filter_column_name;
Names require_columns_name = column_names; Names require_columns_name = column_names;
Block header = nested_storage->getSampleBlockNonMaterialized(); ASTPtr expressions = std::make_shared<ASTExpressionList>();
ColumnWithTypeAndName & sign_column = header.getByPosition(header.columns() - 2); if (column_names_set.empty() || !column_names_set.count(sign_column.name))
{
if (require_columns_name.end() == std::find(require_columns_name.begin(), require_columns_name.end(), sign_column.name))
require_columns_name.emplace_back(sign_column.name); require_columns_name.emplace_back(sign_column.name);
return nested_storage->read(require_columns_name, query_info, context, processed_stage, max_block_size, num_streams); const auto & sign_column_name = std::make_shared<ASTIdentifier>(sign_column.name);
const auto & fetch_sign_value = std::make_shared<ASTLiteral>(Field(Int8(1)));
/*for (auto & pipe : pipes) expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
filter_column_name = expressions->children.back()->getColumnName();
for (const auto & column_name : column_names)
expressions->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
}
Pipes pipes = nested_storage->read(require_columns_name, query_info, context, processed_stage, max_block_size, num_streams);
if (!expressions->children.empty() && !pipes.empty())
{ {
std::cout << "Pipe Header Structure:" << pipe.getHeader().dumpStructure() << "\n"; Block pipe_header = pipes.front().getHeader();
ASTPtr expr = makeASTFunction( SyntaxAnalyzerResultPtr syntax = SyntaxAnalyzer(context).analyze(expressions, pipe_header.getNamesAndTypesList());
"equals", std::make_shared<ASTIdentifier>(sign_column.name), std::make_shared<ASTLiteral>(Field(Int8(1)))); ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true);
auto syntax = SyntaxAnalyzer(context).analyze(expr, pipe.getHeader().getNamesAndTypesList());
ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expr, syntax, context).getActions(true);
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), expression_actions, expr->getColumnName(), false)); for (auto & pipe : pipes)
/// TODO: maybe need remove sign columns {
}*/ assertBlocksHaveEqualStructure(pipe_header, pipe.getHeader(), "StorageMaterializeMySQL");
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), expression_actions, filter_column_name, false));
}
}
// return pipes; return pipes;
}
NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const
{
NamesAndTypesList virtuals;
Block nested_header = nested_storage->getSampleBlockNonMaterialized();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
virtuals.emplace_back(NameAndTypePair(sign_column.name, sign_column.type));
virtuals.emplace_back(NameAndTypePair(version_column.name, version_column.type));
auto nested_virtuals = nested_storage->getVirtuals();
virtuals.insert(virtuals.end(), nested_virtuals.begin(), nested_virtuals.end());
return virtuals;
} }
} }

View File

@ -21,6 +21,8 @@ public:
const Names & column_names, const SelectQueryInfo & query_info, const Names & column_names, const SelectQueryInfo & query_info,
const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;
NamesAndTypesList getVirtuals() const override;
private: private:
StoragePtr nested_storage; StoragePtr nested_storage;
}; };