#include #include #include #include #include #include #include #include #include #include /// toLower #include namespace DB { namespace ErrorCodes { extern const int UNKNOWN_SET_DATA_VARIANT; extern const int NO_SUCH_COLUMN_IN_TABLE; extern const int INCOMPATIBLE_TYPE_OF_JOIN; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int BAD_ARGUMENTS; } StorageJoin::StorageJoin( const String & path_, const String & name_, const Names & key_names_, bool use_nulls_, SizeLimits limits_, ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_, const ColumnsDescription & columns_) : StorageSetOrJoinBase{path_, name_, columns_} , key_names(key_names_) , use_nulls(use_nulls_) , limits(limits_) , kind(kind_) , strictness(strictness_) { for (const auto & key : key_names) if (!getColumns().hasPhysical(key)) throw Exception{"Key column (" + key + ") does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE}; join = std::make_shared(key_names, use_nulls, limits, kind, strictness); join->setSampleBlock(getSampleBlock().sortColumns()); restore(); } void StorageJoin::truncate(const ASTPtr &, const Context &) { Poco::File(path).remove(true); Poco::File(path).createDirectories(); Poco::File(path + "tmp/").createDirectories(); increment = 0; join = std::make_shared(key_names, use_nulls, limits, kind, strictness); join->setSampleBlock(getSampleBlock().sortColumns()); } void StorageJoin::assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_) const { /// NOTE Could be more loose. if (!(kind == kind_ && strictness == strictness_)) throw Exception("Table " + table_name + " has incompatible type of JOIN.", ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN); } void StorageJoin::insertBlock(const Block & block) { join->insertFromBlock(block); } size_t StorageJoin::getSize() const { return join->getTotalRowCount(); } void registerStorageJoin(StorageFactory & factory) { factory.registerStorage("Join", [](const StorageFactory::Arguments & args) { /// Join(ANY, LEFT, k1, k2, ...) ASTs & engine_args = args.engine_args; if (engine_args.size() < 3) throw Exception( "Storage Join requires at least 3 parameters: Join(ANY|ALL, LEFT|INNER, keys...).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); const ASTIdentifier * strictness_id = typeid_cast(engine_args[0].get()); if (!strictness_id) throw Exception("First parameter of storage Join must be ANY or ALL (without quotes).", ErrorCodes::BAD_ARGUMENTS); const String strictness_str = Poco::toLower(strictness_id->name); ASTTableJoin::Strictness strictness; if (strictness_str == "any") strictness = ASTTableJoin::Strictness::Any; else if (strictness_str == "all") strictness = ASTTableJoin::Strictness::All; else throw Exception("First parameter of storage Join must be ANY or ALL (without quotes).", ErrorCodes::BAD_ARGUMENTS); const ASTIdentifier * kind_id = typeid_cast(engine_args[1].get()); if (!kind_id) throw Exception("Second parameter of storage Join must be LEFT or INNER (without quotes).", ErrorCodes::BAD_ARGUMENTS); const String kind_str = Poco::toLower(kind_id->name); ASTTableJoin::Kind kind; if (kind_str == "left") kind = ASTTableJoin::Kind::Left; else if (kind_str == "inner") kind = ASTTableJoin::Kind::Inner; else if (kind_str == "right") kind = ASTTableJoin::Kind::Right; else if (kind_str == "full") kind = ASTTableJoin::Kind::Full; else throw Exception("Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL (without quotes).", ErrorCodes::BAD_ARGUMENTS); Names key_names; key_names.reserve(engine_args.size() - 2); for (size_t i = 2, size = engine_args.size(); i < size; ++i) { const ASTIdentifier * key = typeid_cast(engine_args[i].get()); if (!key) throw Exception("Parameter №" + toString(i + 1) + " of storage Join don't look like column name.", ErrorCodes::BAD_ARGUMENTS); key_names.push_back(key->name); } auto & settings = args.context.getSettingsRef(); auto join_use_nulls = settings.join_use_nulls; auto max_rows_in_join = settings.max_rows_in_join; auto max_bytes_in_join = settings.max_bytes_in_join; auto join_overflow_mode = settings.join_overflow_mode; if (args.storage_def && args.storage_def->settings) { for (const ASTSetQuery::Change & setting : args.storage_def->settings->changes) { if (setting.name == "join_use_nulls") join_use_nulls.set(setting.value); else if (setting.name == "max_rows_in_join") max_rows_in_join.set(setting.value); else if (setting.name == "max_bytes_in_join") max_bytes_in_join.set(setting.value); else if (setting.name == "join_overflow_mode") join_overflow_mode.set(setting.value); else throw Exception( "Unknown setting " + setting.name + " for storage " + args.engine_name, ErrorCodes::BAD_ARGUMENTS); } } return StorageJoin::create( args.data_path, args.table_name, key_names, join_use_nulls.value, SizeLimits{max_rows_in_join.value, max_bytes_in_join.value, join_overflow_mode.value}, kind, strictness, args.columns); }); } template static const char * rawData(T & t) { return reinterpret_cast(&t); } template static size_t rawSize(T &) { return sizeof(T); } template <> const char * rawData(const StringRef & t) { return t.data; } template <> size_t rawSize(const StringRef & t) { return t.size; } class JoinBlockInputStream : public IProfilingBlockInputStream { public: JoinBlockInputStream(const Join & parent_, size_t max_block_size_, Block && sample_block_) : parent(parent_), lock(parent.rwlock), max_block_size(max_block_size_), sample_block(std::move(sample_block_)) { columns.resize(sample_block.columns()); column_indices.resize(sample_block.columns()); column_with_null.resize(sample_block.columns()); for (size_t i = 0; i < sample_block.columns(); ++i) { auto & [_, type, name] = sample_block.getByPosition(i); if (parent.sample_block_with_keys.has(name)) { key_pos = i; column_with_null[i] = parent.sample_block_with_keys.getByName(name).type->isNullable(); } else { auto pos = parent.sample_block_with_columns_to_add.getPositionByName(name); column_indices[i] = pos; column_with_null[i] = !parent.sample_block_with_columns_to_add.getByPosition(pos).type->equals(*type); } } } String getName() const override { return "Join"; } Block getHeader() const override { return sample_block; } protected: Block readImpl() override { if (parent.blocks.empty()) return Block(); if (parent.strictness == ASTTableJoin::Strictness::Any) return createBlock(parent.maps_any); else if (parent.strictness == ASTTableJoin::Strictness::All) return createBlock(parent.maps_all); else throw Exception("Logical error: unknown JOIN strictness (must be ANY or ALL)", ErrorCodes::LOGICAL_ERROR); } private: const Join & parent; std::shared_lock lock; size_t max_block_size; Block sample_block; ColumnNumbers column_indices; std::vector column_with_null; std::optional key_pos; MutableColumns columns; std::unique_ptr> position; /// type erasure template Block createBlock(const Maps & maps) { for (size_t i = 0; i < sample_block.columns(); ++i) { const auto & src_col = sample_block.safeGetByPosition(i); columns[i] = src_col.type->createColumn(); if (column_with_null[i]) { if (key_pos == i) { // unwrap null key column ColumnNullable & nullable_col = static_cast(*columns[i]); columns[i] = nullable_col.getNestedColumnPtr()->assumeMutable(); } else // wrap non key column with null columns[i] = makeNullable(std::move(columns[i]))->assumeMutable(); } } size_t rows_added = 0; switch (parent.type) { #define M(TYPE) \ case Join::Type::TYPE: \ rows_added = fillColumns(*maps.TYPE); \ break; APPLY_FOR_JOIN_VARIANTS_LIMITED(M) #undef M default: throw Exception("Unknown JOIN keys variant for limited use", ErrorCodes::UNKNOWN_SET_DATA_VARIANT); } if (!rows_added) return {}; Block res = sample_block.cloneEmpty(); for (size_t i = 0; i < columns.size(); ++i) if (column_with_null[i]) { if (key_pos == i) res.getByPosition(i).column = makeNullable(std::move(columns[i]))->assumeMutable(); else { const ColumnNullable & nullable_col = static_cast(*columns[i]); res.getByPosition(i).column = nullable_col.getNestedColumnPtr(); } } else res.getByPosition(i).column = std::move(columns[i]); return res; } template size_t fillColumns(const Map & map) { size_t rows_added = 0; if (!position) position = decltype(position)( static_cast(new typename Map::const_iterator(map.begin())), [](void * ptr) { delete reinterpret_cast(ptr); }); auto & it = *reinterpret_cast(position.get()); auto end = map.end(); for (; it != end; ++it) { if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) { for (size_t j = 0; j < columns.size(); ++j) if (j == key_pos) columns[j]->insertData(rawData(it->first), rawSize(it->first)); else columns[j]->insertFrom(*it->second.block->getByPosition(column_indices[j]).column.get(), it->second.row_num); ++rows_added; } else for (auto current = &static_cast(it->second); current != nullptr; current = current->next) { for (size_t j = 0; j < columns.size(); ++j) if (j == key_pos) columns[j]->insertData(rawData(it->first), rawSize(it->first)); else columns[j]->insertFrom(*current->block->getByPosition(column_indices[j]).column.get(), current->row_num); ++rows_added; } if (rows_added >= max_block_size) { ++it; break; } } return rows_added; } }; // TODO: multiple stream read and index read BlockInputStreams StorageJoin::read( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & /*context*/, QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, unsigned /*num_streams*/) { check(column_names); return {std::make_shared(*join, max_block_size, getSampleBlockForColumns(column_names))}; } }