#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int NO_SUCH_COLUMN_IN_TABLE; extern const int NOT_IMPLEMENTED; } class MergeTreeIndexSource : public ISource, WithContext { public: MergeTreeIndexSource( Block header_, Block index_header_, MergeTreeData::DataPartsVector data_parts_, ContextPtr context_, bool with_marks_) : ISource(header_) , WithContext(context_) , header(std::move(header_)) , index_header(std::move(index_header_)) , data_parts(std::move(data_parts_)) , with_marks(with_marks_) { } String getName() const override { return "MergeTreeIndex"; } protected: Chunk generate() override { if (part_index >= data_parts.size()) return {}; const auto & part = data_parts[part_index]; const auto & index_granularity = part->index_granularity; std::shared_ptr marks_loader; if (with_marks && isCompactPart(part)) marks_loader = createMarksLoader(part, MergeTreeDataPartCompact::DATA_FILE_NAME, part->getColumns().size()); size_t num_columns = header.columns(); size_t num_rows = index_granularity.getMarksCount(); const auto & part_name_column = StorageMergeTreeIndex::part_name_column; const auto & mark_number_column = StorageMergeTreeIndex::mark_number_column; const auto & rows_in_granule_column = StorageMergeTreeIndex::rows_in_granule_column; const auto & index = part->getIndex(); Columns result_columns(num_columns); for (size_t pos = 0; pos < num_columns; ++pos) { const auto & column_name = header.getByPosition(pos).name; const auto & column_type = header.getByPosition(pos).type; if (index_header.has(column_name)) { size_t index_position = index_header.getPositionByName(column_name); result_columns[pos] = index[index_position]; } else if (column_name == part_name_column.name) { auto column = column_type->createColumnConst(num_rows, part->name); result_columns[pos] = column->convertToFullColumnIfConst(); } else if (column_name == mark_number_column.name) { auto column = column_type->createColumn(); auto & data = assert_cast(*column).getData(); data.resize(num_rows); std::iota(data.begin(), data.end(), 0); result_columns[pos] = std::move(column); } else if (column_name == rows_in_granule_column.name) { auto column = column_type->createColumn(); auto & data = assert_cast(*column).getData(); data.resize(num_rows); for (size_t i = 0; i < num_rows; ++i) data[i] = index_granularity.getMarkRows(i); result_columns[pos] = std::move(column); } else if (auto [first, second] = Nested::splitName(column_name, true); with_marks && second == "mark") { result_columns[pos] = fillMarks(part, marks_loader, *column_type, first); } else { throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "No such column {}", column_name); } } ++part_index; return Chunk(std::move(result_columns), num_rows); } private: std::shared_ptr createMarksLoader(const MergeTreeDataPartPtr & part, const String & prefix_name, size_t num_columns) { auto info_for_read = std::make_shared(part, std::make_shared()); auto local_context = getContext(); return std::make_shared( info_for_read, local_context->getMarkCache().get(), info_for_read->getIndexGranularityInfo().getMarksFilePath(prefix_name), info_for_read->getMarksCount(), info_for_read->getIndexGranularityInfo(), /*save_marks_in_cache=*/ false, local_context->getReadSettings(), /*load_marks_threadpool=*/ nullptr, num_columns); } ColumnPtr fillMarks( MergeTreeDataPartPtr part, std::shared_ptr marks_loader, const IDataType & data_type, const String & column_name) { size_t col_idx = 0; bool has_marks_in_part = false; size_t num_rows = part->index_granularity.getMarksCount(); if (isWidePart(part)) { if (auto stream_name = part->getStreamNameOrHash(column_name, part->checksums)) { col_idx = 0; has_marks_in_part = true; marks_loader = createMarksLoader(part, *stream_name, /*num_columns=*/ 1); } } else if (isCompactPart(part)) { auto unescaped_name = unescapeForFileName(column_name); if (auto col_idx_opt = part->getColumnPosition(unescaped_name)) { col_idx = *col_idx_opt; has_marks_in_part = true; } } else { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Parts with type {} are not supported", part->getTypeName()); } if (!has_marks_in_part) { auto column = data_type.createColumnConstWithDefaultValue(num_rows); return column->convertToFullColumnIfConst(); } auto compressed = ColumnUInt64::create(num_rows); auto uncompressed = ColumnUInt64::create(num_rows); auto & compressed_data = compressed->getData(); auto & uncompressed_data = uncompressed->getData(); auto marks_getter = marks_loader->loadMarks(); for (size_t i = 0; i < num_rows; ++i) { auto mark = marks_getter->getMark(i, col_idx); compressed_data[i] = mark.offset_in_compressed_file; uncompressed_data[i] = mark.offset_in_decompressed_block; } auto compressed_nullable = ColumnNullable::create(std::move(compressed), ColumnUInt8::create(num_rows, 0)); auto uncompressed_nullable = ColumnNullable::create(std::move(uncompressed), ColumnUInt8::create(num_rows, 0)); return ColumnTuple::create(Columns{std::move(compressed_nullable), std::move(uncompressed_nullable)}); } Block header; Block index_header; MergeTreeData::DataPartsVector data_parts; bool with_marks; size_t part_index = 0; }; const ColumnWithTypeAndName StorageMergeTreeIndex::part_name_column{std::make_shared(), "part_name"}; const ColumnWithTypeAndName StorageMergeTreeIndex::mark_number_column{std::make_shared(), "mark_number"}; const ColumnWithTypeAndName StorageMergeTreeIndex::rows_in_granule_column{std::make_shared(), "rows_in_granule"}; const Block StorageMergeTreeIndex::virtuals_sample_block{part_name_column, mark_number_column, rows_in_granule_column}; StorageMergeTreeIndex::StorageMergeTreeIndex( const StorageID & table_id_, const StoragePtr & source_table_, const ColumnsDescription & columns, bool with_marks_) : IStorage(table_id_) , source_table(source_table_) , with_marks(with_marks_) { const auto * merge_tree = dynamic_cast(source_table.get()); if (!merge_tree) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Storage MergeTreeIndex expected MergeTree table, got: {}", source_table->getName()); data_parts = merge_tree->getDataPartsVectorForInternalUsage(); key_sample_block = merge_tree->getInMemoryMetadataPtr()->getPrimaryKey().sample_block; StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns); setInMemoryMetadata(storage_metadata); } class ReadFromMergeTreeIndex : public SourceStepWithFilter { public: std::string getName() const override { return "ReadFromMergeTreeIndex"; } void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; ReadFromMergeTreeIndex( const Names & column_names_, const SelectQueryInfo & query_info_, const StorageSnapshotPtr & storage_snapshot_, const ContextPtr & context_, Block sample_block, std::shared_ptr storage_) : SourceStepWithFilter( DataStream{.header = std::move(sample_block)}, column_names_, query_info_, storage_snapshot_, context_) , storage(std::move(storage_)) , log(&Poco::Logger::get("StorageMergeTreeIndex")) { } void applyFilters(ActionDAGNodes added_filter_nodes) override; private: std::shared_ptr storage; Poco::Logger * log; const ActionsDAG::Node * predicate = nullptr; }; void ReadFromMergeTreeIndex::applyFilters(ActionDAGNodes added_filter_nodes) { filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes); if (filter_actions_dag) predicate = filter_actions_dag->getOutputs().at(0); } void StorageMergeTreeIndex::read( QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum, size_t /*max_block_size*/, size_t /*num_streams*/) { const auto & storage_columns = source_table->getInMemoryMetadataPtr()->getColumns(); Names columns_from_storage; for (const auto & column_name : column_names) { if (storage_columns.hasColumnOrSubcolumn(GetColumnsOptions::All, column_name)) { columns_from_storage.push_back(column_name); continue; } if (with_marks) { auto [first, second] = Nested::splitName(column_name, true); auto unescaped_name = unescapeForFileName(first); if (second == "mark" && storage_columns.hasColumnOrSubcolumn(GetColumnsOptions::All, unescapeForFileName(unescaped_name))) { columns_from_storage.push_back(unescaped_name); continue; } } } context->checkAccess(AccessType::SELECT, source_table->getStorageID(), columns_from_storage); auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names); auto this_ptr = std::static_pointer_cast(shared_from_this()); auto reading = std::make_unique( column_names, query_info, storage_snapshot, std::move(context), std::move(sample_block), std::move(this_ptr)); query_plan.addStep(std::move(reading)); } void ReadFromMergeTreeIndex::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { auto filtered_parts = storage->getFilteredDataParts(predicate, context); LOG_DEBUG(log, "Reading index{}from {} parts of table {}", storage->with_marks ? " with marks " : " ", filtered_parts.size(), storage->source_table->getStorageID().getNameForLogs()); pipeline.init(Pipe(std::make_shared(getOutputStream().header, storage->key_sample_block, std::move(filtered_parts), context, storage->with_marks))); } MergeTreeData::DataPartsVector StorageMergeTreeIndex::getFilteredDataParts(const ActionsDAG::Node * predicate, const ContextPtr & context) const { if (!predicate) return data_parts; auto all_part_names = ColumnString::create(); for (const auto & part : data_parts) all_part_names->insert(part->name); Block filtered_block{{std::move(all_part_names), std::make_shared(), part_name_column.name}}; VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context); if (!filtered_block.rows()) return {}; auto part_names = filtered_block.getByPosition(0).column; const auto & part_names_str = assert_cast(*part_names); HashSet part_names_set; for (size_t i = 0; i < part_names_str.size(); ++i) part_names_set.insert(part_names_str.getDataAt(i)); MergeTreeData::DataPartsVector filtered_parts; for (const auto & part : data_parts) if (part_names_set.has(part->name)) filtered_parts.push_back(part); return filtered_parts; } }