From a9e1794bd3ec1f1eb13d7e4d1c199256ac522b9c Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 24 Jun 2020 13:28:27 +0800 Subject: [PATCH] ISSUES-4006 filter sign = -1 rows --- .../MySQL/MaterializeMySQLSyncThread.cpp | 5 +- src/Storages/StorageMaterializeMySQL.cpp | 63 ++++++++++++++----- src/Storages/StorageMaterializeMySQL.h | 2 + 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index 16187585ce4..caaec6defcf 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -310,8 +310,8 @@ static inline size_t onUpdateData(const std::vector & rows_data, Block & std::vector difference_sorting_keys_mark(rows_data.size() / 2); for (size_t index = 0; index < rows_data.size(); index += 2) - difference_sorting_keys_mark.emplace_back(differenceSortingKeys( - DB::get(rows_data[index]), DB::get(rows_data[index + 1]), sorting_columns_index)); + difference_sorting_keys_mark[index / 2] = differenceSortingKeys( + DB::get(rows_data[index]), DB::get(rows_data[index + 1]), sorting_columns_index); for (size_t column = 0; column < buffer.columns() - 2; ++column) { @@ -447,6 +447,7 @@ MaterializeMySQLSyncThread::Buffers::BufferAndSortingColumnsPtr MaterializeMySQL for (const auto & required_name_for_sorting_key : required_for_sorting_key) buffer_and_soring_columns->second.emplace_back( buffer_and_soring_columns->first.getPositionByName(required_name_for_sorting_key)); + } return buffer_and_soring_columns; diff --git a/src/Storages/StorageMaterializeMySQL.cpp b/src/Storages/StorageMaterializeMySQL.cpp index aba9ea061e0..03a1d511d26 100644 --- a/src/Storages/StorageMaterializeMySQL.cpp +++ b/src/Storages/StorageMaterializeMySQL.cpp @@ -35,7 +35,13 @@ Pipes StorageMaterializeMySQL::read( size_t max_block_size, unsigned int num_streams) { - if (ASTSelectQuery * select_query = query_info.query->as()) + NameSet column_names_set = NameSet(column_names.begin(), column_names.end()); + + Block nested_header = nested_storage->getSampleBlockNonMaterialized(); + ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2); + ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1); + + if (ASTSelectQuery * select_query = query_info.query->as(); select_query && !column_names_set.count(version_column.name)) { auto & tables_in_select_query = select_query->tables()->as(); @@ -48,28 +54,53 @@ Pipes StorageMaterializeMySQL::read( } } + String filter_column_name; Names require_columns_name = column_names; - Block header = nested_storage->getSampleBlockNonMaterialized(); - ColumnWithTypeAndName & sign_column = header.getByPosition(header.columns() - 2); - - if (require_columns_name.end() == std::find(require_columns_name.begin(), require_columns_name.end(), sign_column.name)) + ASTPtr expressions = std::make_shared(); + if (column_names_set.empty() || !column_names_set.count(sign_column.name)) + { require_columns_name.emplace_back(sign_column.name); - return nested_storage->read(require_columns_name, query_info, context, processed_stage, max_block_size, num_streams); + const auto & sign_column_name = std::make_shared(sign_column.name); + const auto & fetch_sign_value = std::make_shared(Field(Int8(1))); - /*for (auto & pipe : pipes) + expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value)); + filter_column_name = expressions->children.back()->getColumnName(); + + for (const auto & column_name : column_names) + expressions->children.emplace_back(std::make_shared(column_name)); + } + + Pipes pipes = nested_storage->read(require_columns_name, query_info, context, processed_stage, max_block_size, num_streams); + + if (!expressions->children.empty() && !pipes.empty()) { - std::cout << "Pipe Header Structure:" << pipe.getHeader().dumpStructure() << "\n"; - ASTPtr expr = makeASTFunction( - "equals", std::make_shared(sign_column.name), std::make_shared(Field(Int8(1)))); - auto syntax = SyntaxAnalyzer(context).analyze(expr, pipe.getHeader().getNamesAndTypesList()); - ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expr, syntax, context).getActions(true); + Block pipe_header = pipes.front().getHeader(); + SyntaxAnalyzerResultPtr syntax = SyntaxAnalyzer(context).analyze(expressions, pipe_header.getNamesAndTypesList()); + ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true); - pipe.addSimpleTransform(std::make_shared(pipe.getHeader(), expression_actions, expr->getColumnName(), false)); - /// TODO: maybe need remove sign columns - }*/ + for (auto & pipe : pipes) + { + assertBlocksHaveEqualStructure(pipe_header, pipe.getHeader(), "StorageMaterializeMySQL"); + pipe.addSimpleTransform(std::make_shared(pipe.getHeader(), expression_actions, filter_column_name, false)); + } + } -// return pipes; + return pipes; +} + +NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const +{ + NamesAndTypesList virtuals; + Block nested_header = nested_storage->getSampleBlockNonMaterialized(); + ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2); + ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1); + virtuals.emplace_back(NameAndTypePair(sign_column.name, sign_column.type)); + virtuals.emplace_back(NameAndTypePair(version_column.name, version_column.type)); + + auto nested_virtuals = nested_storage->getVirtuals(); + virtuals.insert(virtuals.end(), nested_virtuals.begin(), nested_virtuals.end()); + return virtuals; } } diff --git a/src/Storages/StorageMaterializeMySQL.h b/src/Storages/StorageMaterializeMySQL.h index d88e72ed737..a74f2208a9a 100644 --- a/src/Storages/StorageMaterializeMySQL.h +++ b/src/Storages/StorageMaterializeMySQL.h @@ -21,6 +21,8 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; + NamesAndTypesList getVirtuals() const override; + private: StoragePtr nested_storage; };