better code for join with dict

This commit is contained in:
vdimir 2022-08-08 10:58:28 +00:00
parent 44c688332a
commit 90fa2ed8c1
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
13 changed files with 183 additions and 219 deletions

View File

@ -5,16 +5,18 @@
#include <Core/Names.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Interpreters/IExternalLoadable.h>
#include <Interpreters/StorageID.h>
#include <Interpreters/IKeyValueEntity.h>
#include <Interpreters/castColumn.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include <DataTypes/IDataType.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
@ -52,7 +54,7 @@ enum class DictionarySpecialKeyType
/**
* Base class for Dictionaries implementation.
*/
class IDictionary : public IExternalLoadable
class IDictionary : public IExternalLoadable, public IKeyValueEntity
{
public:
explicit IDictionary(const StorageID & dictionary_id_)
@ -290,6 +292,11 @@ public:
return dictionary_comment;
}
/// IKeyValueEntity implementation
Names getPrimaryKey() const override;
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & out_null_map, const Names & result_names) const override;
Block getSampleBlock(const Names & result_names) const override;
private:
mutable std::mutex mutex;
mutable StorageID dictionary_id TSA_GUARDED_BY(mutex);

View File

@ -0,0 +1,116 @@
#include <Dictionaries/IDictionary.h>
#include <Interpreters/JoinUtils.h>
namespace DB
{
static void splitNamesAndTypesFromStructure(const DictionaryStructure & structure, const Names & result_names, Names & attribute_names, DataTypes & result_types)
{
if (!result_names.empty())
{
for (const auto & attr_name : result_names)
{
if (!structure.hasAttribute(attr_name))
continue; /// skip keys
const auto & attr = structure.getAttribute(attr_name);
attribute_names.emplace_back(attr.name);
result_types.emplace_back(attr.type);
}
}
else
{
/// If result_names is empty, then use all attributes from structure
for (const auto & attr : structure.attributes)
{
attribute_names.emplace_back(attr.name);
result_types.emplace_back(attr.type);
}
}
}
Names IDictionary::getPrimaryKey() const
{
return getStructure().getKeysNames();
}
Chunk IDictionary::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & out_null_map, const Names & result_names) const
{
if (keys.empty())
return Chunk(getSampleBlock(result_names).cloneEmpty().getColumns(), 0);
const auto & dictionary_structure = getStructure();
/// Split column keys and types into separate vectors, to use in `IDictionary::getColumns`
Columns key_columns;
DataTypes key_types;
for (const auto & key : keys)
{
key_columns.emplace_back(key.column);
key_types.emplace_back(key.type);
}
/// Fill null map
{
out_null_map.clear();
auto mask = hasKeys(key_columns, key_types);
const auto & mask_data = mask->getData();
out_null_map.resize(mask_data.size(), 0);
std::copy(mask_data.begin(), mask_data.end(), out_null_map.begin());
}
Names attribute_names;
DataTypes result_types;
splitNamesAndTypesFromStructure(dictionary_structure, result_names, attribute_names, result_types);
Columns default_cols(result_types.size());
for (size_t i = 0; i < result_types.size(); ++i)
/// Dictinonary may have non-standart default values specified
default_cols[i] = result_types[i]->createColumnConstWithDefaultValue(out_null_map.size());
Columns result_columns = getColumns(attribute_names, result_types, key_columns, key_types, default_cols);
/// Result block should consist of key columns and then attributes
for (const auto & key_col : key_columns)
{
/// Insert default values for keys that were not found
ColumnPtr filtered_key_col = JoinCommon::filterWithBlanks(key_col, out_null_map);
result_columns.insert(result_columns.begin(), filtered_key_col);
}
size_t num_rows = result_columns[0]->size();
return Chunk(std::move(result_columns), num_rows);
}
Block IDictionary::getSampleBlock(const Names & result_names) const
{
const auto & dictionary_structure = getStructure();
const auto & key_types = dictionary_structure.getKeyTypes();
const auto & key_names = dictionary_structure.getKeysNames();
Block sample_block;
for (size_t i = 0; i < key_types.size(); ++i)
sample_block.insert(ColumnWithTypeAndName(nullptr, key_types.at(i), key_names.at(i)));
if (result_names.empty())
{
for (const auto & attr : dictionary_structure.attributes)
sample_block.insert(ColumnWithTypeAndName(nullptr, attr.type, attr.name));
}
else
{
for (const auto & attr_name : result_names)
{
if (!dictionary_structure.hasAttribute(attr_name))
continue; /// skip keys
const auto & attr = dictionary_structure.getAttribute(attr_name);
sample_block.insert(ColumnWithTypeAndName(nullptr, attr.type, attr_name));
}
}
return sample_block;
}
}

View File

@ -1,101 +1,8 @@
#include <Interpreters/DictionaryJoinAdapter.h>
#include <Interpreters/JoinUtils.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsExternalDictionaries.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
DictionaryJoinAdapter::DictionaryJoinAdapter(
std::shared_ptr<const IDictionary> dictionary_, const Names & result_column_names)
: IKeyValueEntity()
, dictionary(dictionary_)
{
if (!dictionary)
throw Exception("Dictionary is not initialized", ErrorCodes::LOGICAL_ERROR);
const auto & key_types = dictionary->getStructure().getKeyTypes();
const auto & key_names = dictionary->getStructure().getKeysNames();
if (key_types.size() != key_names.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary '{}' has invalid structure", dictionary->getFullName());
for (size_t i = 0; i < key_types.size(); ++i)
{
sample_block.insert(ColumnWithTypeAndName(nullptr, key_types[i], key_names[i]));
}
for (const auto & attr_name : result_column_names)
{
const auto & attr = dictionary->getStructure().getAttribute(attr_name);
sample_block.insert(ColumnWithTypeAndName(nullptr, attr.type, attr_name));
attribute_names.emplace_back(attr_name);
result_types.emplace_back(attr.type);
}
}
Names DictionaryJoinAdapter::getPrimaryKey() const
{
return dictionary->getStructure().getKeysNames();
}
Block DictionaryJoinAdapter::getSampleBlock() const
{
return sample_block;
}
Chunk DictionaryJoinAdapter::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & out_null_map) const
{
if (keys.empty())
return {};
Columns key_columns;
DataTypes key_types;
for (const auto & key : keys)
{
key_columns.emplace_back(key.column);
key_types.emplace_back(key.type);
}
{
out_null_map.clear();
auto mask = dictionary->hasKeys(key_columns, key_types);
const auto & mask_data = mask->getData();
out_null_map.resize(mask_data.size(), 0);
std::copy(mask_data.begin(), mask_data.end(), out_null_map.begin());
}
Columns default_cols(result_types.size());
for (size_t i = 0; i < result_types.size(); ++i)
/// Dictinonary may have non-standart default values specified
default_cols[i] = result_types[i]->createColumnConstWithDefaultValue(out_null_map.size());
/// Result block consists of key columns and then attributes
Columns result_columns = dictionary->getColumns(attribute_names, result_types, key_columns, key_types, default_cols);
for (const auto & key_col : key_columns)
{
/// Insert default values for keys that were not found
ColumnPtr filtered_key_col = JoinCommon::filterWithBlanks(key_col, out_null_map);
result_columns.insert(result_columns.begin(), filtered_key_col);
}
size_t num_rows = result_columns[0]->size();
return Chunk(std::move(result_columns), num_rows);
}
}

View File

@ -1,32 +1,7 @@
#pragma once
#include <Core/Block.h>
#include <Columns/ColumnVector.h>
#include <Dictionaries/IDictionary.h>
#include <Interpreters/IKeyValueEntity.h>
namespace DB
{
/// Used in join with dictionary to provide sufficient interface to DirectJoin
class DictionaryJoinAdapter : public IKeyValueEntity
{
public:
DictionaryJoinAdapter(std::shared_ptr<const IDictionary> dictionary_, const Names & result_column_names);
Names getPrimaryKey() const override;
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & out_null_map) const override;
Block getSampleBlock() const override;
private:
std::shared_ptr<const IDictionary> dictionary;
Block sample_block;
Strings attribute_names;
DataTypes result_types;
};
}

View File

@ -63,7 +63,7 @@ static MutableColumns convertBlockStructure(
DirectKeyValueJoin::DirectKeyValueJoin(std::shared_ptr<TableJoin> table_join_,
const Block & right_sample_block_,
std::shared_ptr<IKeyValueEntity> storage_)
std::shared_ptr<const IKeyValueEntity> storage_)
: table_join(table_join_)
, storage(storage_)
, right_sample_block(right_sample_block_)
@ -113,12 +113,14 @@ void DirectKeyValueJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> &)
if (!key_col.column)
return;
Block original_right_block = originalRightBlock(right_sample_block, *table_join);
const Names & attribute_names = original_right_block.getNames();
NullMap null_map;
Chunk joined_chunk = storage->getByKeys({key_col}, null_map);
Chunk joined_chunk = storage->getByKeys({key_col}, null_map, attribute_names);
/// Expected right block may differ from structure in storage, because of `join_use_nulls` or we just select not all joined attributes
Block original_right_block = originalRightBlock(right_sample_block, *table_join);
Block sample_storage_block = storage->getSampleBlock();
Block sample_storage_block = storage->getSampleBlock(attribute_names);
MutableColumns result_columns = convertBlockStructure(sample_storage_block, original_right_block, joined_chunk.mutateColumns(), null_map);
for (size_t i = 0; i < result_columns.size(); ++i)

View File

@ -23,7 +23,7 @@ public:
DirectKeyValueJoin(
std::shared_ptr<TableJoin> table_join_,
const Block & right_sample_block_,
std::shared_ptr<IKeyValueEntity> storage_);
std::shared_ptr<const IKeyValueEntity> storage_);
virtual const TableJoin & getTableJoin() const override { return *table_join; }
@ -50,7 +50,7 @@ public:
private:
std::shared_ptr<TableJoin> table_join;
std::shared_ptr<IKeyValueEntity> storage;
std::shared_ptr<const IKeyValueEntity> storage;
Block right_sample_block;
Block sample_block_with_columns_to_add;
Poco::Logger * log;

View File

@ -1008,7 +1008,6 @@ static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoi
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}
std::shared_ptr<DirectKeyValueJoin> tryDictJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block, ContextPtr context);
std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block);
static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> analyzed_join, std::unique_ptr<QueryPlan> & joined_plan, ContextPtr context)
@ -1017,10 +1016,7 @@ static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> ana
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
{
JoinPtr direct_join = nullptr;
direct_join = direct_join ? direct_join : tryKeyValueJoin(analyzed_join, right_sample_block);
direct_join = direct_join ? direct_join : tryDictJoin(analyzed_join, right_sample_block, context);
JoinPtr direct_join = tryKeyValueJoin(analyzed_join, right_sample_block);
if (direct_join)
{
/// Do not need to execute plan for right part, it's ready.
@ -1112,67 +1108,6 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
return joined_plan;
}
std::shared_ptr<DirectKeyValueJoin> tryDictJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block, ContextPtr context)
{
bool allowed_inner = isInner(analyzed_join->kind()) && analyzed_join->strictness() == JoinStrictness::All;
bool allowed_left = isLeft(analyzed_join->kind()) && (analyzed_join->strictness() == JoinStrictness::Any ||
analyzed_join->strictness() == JoinStrictness::All ||
analyzed_join->strictness() == JoinStrictness::Semi ||
analyzed_join->strictness() == JoinStrictness::Anti);
if (!allowed_inner && !allowed_left)
{
LOG_TRACE(getLogger(), "Can't use dictionary join: {} {} is not supported",
analyzed_join->kind(), analyzed_join->strictness());
return nullptr;
}
if (analyzed_join->getClauses().size() != 1 || analyzed_join->getClauses()[0].key_names_right.size() != 1)
{
LOG_TRACE(getLogger(), "Can't use dictionary join: only one key is supported");
return nullptr;
}
const auto & right_key = analyzed_join->getOnlyClause().key_names_right[0];
const auto & dictionary_name = analyzed_join->getRightStorageName();
if (dictionary_name.empty())
{
LOG_TRACE(getLogger(), "Can't use dictionary join: dictionary was not found");
return nullptr;
}
FunctionDictHelper dictionary_helper(context);
auto dictionary = dictionary_helper.getDictionary(dictionary_name);
if (!dictionary)
{
LOG_TRACE(getLogger(), "Can't use dictionary join: dictionary was not found");
return nullptr;
}
const auto & dict_keys = dictionary->getStructure().getKeysNames();
if (dict_keys.size() != 1 || dict_keys[0] != analyzed_join->getOriginalName(right_key))
{
LOG_TRACE(getLogger(), "Can't use dictionary join: join key '{}' doesn't natch to dictionary key ({})",
right_key, fmt::join(dict_keys, ", "));
return nullptr;
}
Names attr_names;
for (const auto & col : right_sample_block)
{
if (col.name == right_key)
continue;
const auto & original_name = analyzed_join->getOriginalName(col.name);
if (dictionary->getStructure().hasAttribute(original_name))
attr_names.push_back(original_name);
}
auto dict_reader = std::make_shared<DictionaryJoinAdapter>(dictionary, attr_names);
return std::make_shared<DirectKeyValueJoin>(analyzed_join, right_sample_block, dict_reader);
}
std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block)
{
if (!analyzed_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
@ -1180,19 +1115,17 @@ std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> a
auto storage = analyzed_join->getStorageKeyValue();
if (!storage)
{
return nullptr;
}
if (!isInnerOrLeft(analyzed_join->kind()))
{
return nullptr;
}
if (analyzed_join->strictness() != JoinStrictness::All &&
analyzed_join->strictness() != JoinStrictness::Any &&
analyzed_join->strictness() != JoinStrictness::RightAny)
bool allowed_inner = isInner(analyzed_join->kind()) && analyzed_join->strictness() == JoinStrictness::All;
bool allowed_left = isLeft(analyzed_join->kind()) && (analyzed_join->strictness() == JoinStrictness::Any ||
analyzed_join->strictness() == JoinStrictness::All ||
analyzed_join->strictness() == JoinStrictness::Semi ||
analyzed_join->strictness() == JoinStrictness::Anti);
if (!allowed_inner && !allowed_left)
{
LOG_TRACE(getLogger(), "Can't use direct join: {} {} is not supported",
analyzed_join->kind(), analyzed_join->strictness());
return nullptr;
}
@ -1205,6 +1138,7 @@ std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> a
if (!only_one_key)
{
LOG_TRACE(getLogger(), "Can't use direct join: only one key is supported");
return nullptr;
}
@ -1213,6 +1147,8 @@ std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> a
const auto & storage_primary_key = storage->getPrimaryKey();
if (storage_primary_key.size() != 1 || storage_primary_key[0] != original_key_name)
{
LOG_TRACE(getLogger(), "Can't use direct join: join key '{}' doesn't match to storage key ({})",
original_key_name, fmt::join(storage_primary_key, ", "));
return nullptr;
}

View File

@ -10,6 +10,9 @@ namespace DB
class IKeyValueEntity
{
public:
IKeyValueEntity() = default;
virtual ~IKeyValueEntity() = default;
/// Get primary key name that supports key-value requests.
/// Primary key can constist of multiple columns.
virtual Names getPrimaryKey() const = 0;
@ -19,17 +22,20 @@ public:
*
* @param keys - keys to get data for. Key can be compound and represented by several columns.
* @param out_null_map - output parameter indicating which keys were not found.
* @param required_columns - if we don't need all attributes, implementation can use it to benefit from reading a subset of them.
*
* @return - chunk of data corresponding for keys.
* Number of rows in chunk is equal to size of columns in keys.
* If the key was not found row would have a default value.
*/
virtual Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & out_null_map) const = 0;
virtual Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & out_null_map, const Names & required_columns) const = 0;
/// Header for getByKeys result
virtual Block getSampleBlock() const = 0;
virtual Block getSampleBlock(const Names & required_columns) const = 0;
virtual ~IKeyValueEntity() = default;
protected:
/// Names of result columns. If empty then all columns are required.
Names key_value_result_names;
};
}

View File

@ -7,6 +7,7 @@
#include <Interpreters/InJoinSubqueriesPreprocessor.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/getTableExpressions.h>
#include <Functions/FunctionsExternalDictionaries.h>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTFunction.h>
@ -320,12 +321,25 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
{
table_join->setStorageJoin(storage_join);
}
else if (auto storage_dict = std::dynamic_pointer_cast<StorageDictionary>(storage);
if (auto storage_dict = std::dynamic_pointer_cast<StorageDictionary>(storage);
storage_dict && join_algorithm.isSet(JoinAlgorithm::DIRECT))
{
table_join->setRightStorageName(storage_dict->getDictionaryName());
FunctionDictHelper dictionary_helper(context);
auto dictionary_name = storage_dict->getDictionaryName();
auto dictionary = dictionary_helper.getDictionary(dictionary_name);
if (!dictionary)
{
LOG_TRACE(&Poco::Logger::get("JoinedTables"), "Can't use dictionary join: dictionary '{}' was not found", dictionary_name);
return nullptr;
}
else if (auto storage_kv = std::dynamic_pointer_cast<IKeyValueEntity>(storage);
auto dictionary_kv = std::dynamic_pointer_cast<const IKeyValueEntity>(dictionary);
table_join->setStorageJoin(dictionary_kv);
}
if (auto storage_kv = std::dynamic_pointer_cast<IKeyValueEntity>(storage);
storage_kv && join_algorithm.isSet(JoinAlgorithm::DIRECT))
{
table_join->setStorageJoin(storage_kv);

View File

@ -645,7 +645,7 @@ ActionsDAGPtr TableJoin::applyKeyConvertToTable(
return dag_stage1;
}
void TableJoin::setStorageJoin(std::shared_ptr<IKeyValueEntity> storage)
void TableJoin::setStorageJoin(std::shared_ptr<const IKeyValueEntity> storage)
{
right_kv_storage = storage;
}

View File

@ -140,7 +140,7 @@ private:
std::shared_ptr<StorageJoin> right_storage_join;
std::shared_ptr<IKeyValueEntity> right_kv_storage;
std::shared_ptr<const IKeyValueEntity> right_kv_storage;
std::string right_storage_name;
@ -304,14 +304,14 @@ public:
void setRightStorageName(const std::string & storage_name);
const std::string & getRightStorageName() const;
void setStorageJoin(std::shared_ptr<IKeyValueEntity> storage);
void setStorageJoin(std::shared_ptr<const IKeyValueEntity> storage);
void setStorageJoin(std::shared_ptr<StorageJoin> storage);
std::shared_ptr<StorageJoin> getStorageJoin() { return right_storage_join; }
bool isSpecialStorage() const { return !right_storage_name.empty() || right_storage_join || right_kv_storage; }
std::shared_ptr<IKeyValueEntity> getStorageKeyValue() { return right_kv_storage; }
std::shared_ptr<const IKeyValueEntity> getStorageKeyValue() { return right_kv_storage; }
};
}

View File

@ -559,7 +559,8 @@ std::vector<rocksdb::Status> StorageEmbeddedRocksDB::multiGet(const std::vector<
Chunk StorageEmbeddedRocksDB::getByKeys(
const ColumnsWithTypeAndName & keys,
PaddedPODArray<UInt8> & null_map) const
PaddedPODArray<UInt8> & null_map,
const Names &) const
{
if (keys.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "StorageEmbeddedRocksDB supports only one key, got: {}", keys.size());
@ -572,7 +573,7 @@ Chunk StorageEmbeddedRocksDB::getByKeys(
return getBySerializedKeys(raw_keys, &null_map);
}
Block StorageEmbeddedRocksDB::getSampleBlock() const
Block StorageEmbeddedRocksDB::getSampleBlock(const Names &) const
{
auto metadata = getInMemoryMetadataPtr();
return metadata ? metadata->getSampleBlock() : Block();

View File

@ -63,9 +63,9 @@ public:
std::vector<rocksdb::Status> multiGet(const std::vector<rocksdb::Slice> & slices_keys, std::vector<String> & values) const;
Names getPrimaryKey() const override { return {primary_key}; }
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map) const override;
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const override;
Block getSampleBlock() const override;
Block getSampleBlock(const Names &) const override;
/// Return chunk with data for given serialized keys.
/// If out_null_map is passed, fill it with 1/0 depending on key was/wasn't found. Result chunk may contain default values.