Fix StorageJoin and Asof or join_use_nulls in pipeline

This commit is contained in:
vdimir 2022-02-11 20:26:46 +00:00
parent ee09ec4dd1
commit 99ca89c0ca
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
9 changed files with 105 additions and 37 deletions

View File

@ -74,9 +74,9 @@ FunctionBasePtr JoinGetOverloadResolver<or_null>::buildImpl(const ColumnsWithTyp
{
if (arguments.size() < 3)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size())
+ ", should be greater or equal to 3",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function '{}' doesn't match: passed {}, should be greater or equal to 3",
getName() , arguments.size());
auto [storage_join, attr_name] = getJoin(arguments, getContext());
DataTypes data_types(arguments.size() - 2);
DataTypes argument_types(arguments.size());
@ -86,9 +86,13 @@ FunctionBasePtr JoinGetOverloadResolver<or_null>::buildImpl(const ColumnsWithTyp
data_types[i - 2] = arguments[i].type;
argument_types[i] = arguments[i].type;
}
auto return_type = storage_join->joinGetCheckAndGetReturnType(data_types, attr_name, or_null);
auto return_type = storage_join->joinGetCheckAndGetReturnType(data_types, attr_name, or_null || storage_join->useNulls());
auto table_lock = storage_join->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
if (storage_join->useNulls())
return std::make_unique<FunctionJoinGet<true>>(getContext(), table_lock, storage_join, attr_name, argument_types, return_type);
return std::make_unique<FunctionJoinGet<or_null>>(getContext(), table_lock, storage_join, attr_name, argument_types, return_type);
}

View File

@ -991,7 +991,8 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
if (auto storage = analyzed_join->getStorageJoin())
{
std::tie(left_convert_actions, right_convert_actions) = analyzed_join->createConvertingActions(left_columns, {});
auto right_columns = storage->getRightSampleBlock().getColumnsWithTypeAndName();
std::tie(left_convert_actions, right_convert_actions) = analyzed_join->createConvertingActions(left_columns, right_columns);
return storage->getJoinLocked(analyzed_join, getContext());
}

View File

@ -306,9 +306,6 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
if (key_columns.size() <= 1)
throw Exception("ASOF join needs at least one equi-join column", ErrorCodes::SYNTAX_ERROR);
if (right_table_keys.getByName(key_names_right.back()).type->isNullable())
throw Exception("ASOF join over right table Nullable column is not implemented", ErrorCodes::NOT_IMPLEMENTED);
size_t asof_size;
asof_type = AsofRowRefs::getTypeSize(*key_columns.back(), asof_size);
key_columns.pop_back();

View File

@ -3,6 +3,7 @@
#include <Common/Exception.h>
#include <base/types.h>
#include <Common/StringUtils/StringUtils.h>
#include <Interpreters/ActionsDAG.h>
#include <Core/Block.h>
#include <Core/ColumnsWithTypeAndName.h>
@ -25,6 +26,8 @@
#include <base/logger_useful.h>
#include <algorithm>
#include <string>
#include <vector>
namespace DB
@ -471,6 +474,31 @@ TableJoin::createConvertingActions(const ColumnsWithTypeAndName & left_sample_co
auto left_converting_actions = applyKeyConvertToTable(left_sample_columns, left_type_map, left_key_column_rename, forceNullableLeft());
auto right_converting_actions = applyKeyConvertToTable(right_sample_columns, right_type_map, right_key_column_rename, forceNullableRight());
{
auto log_actions = [](const String & side, const ActionsDAGPtr & dag)
{
if (dag)
{
std::vector<std::string> input_cols;
for (const auto & col : dag->getRequiredColumns())
input_cols.push_back(col.name + ": " + col.type->getName());
std::vector<std::string> output_cols;
for (const auto & col : dag->getResultColumns())
output_cols.push_back(col.name + ": " + col.type->getName());
LOG_DEBUG(&Poco::Logger::get("TableJoin"), "{} JOIN converting actions: [{}] -> [{}]",
side, fmt::join(input_cols, ", "), fmt::join(output_cols, ", "));
}
else
{
LOG_DEBUG(&Poco::Logger::get("TableJoin"), "{} JOIN converting actions: empty", side);
}
};
log_actions("Left", left_converting_actions);
log_actions("Right", right_converting_actions);
}
forAllKeys(clauses, [&](auto & left_key, auto & right_key)
{
renameIfNeeded(left_key, left_key_column_rename);
@ -482,10 +510,18 @@ TableJoin::createConvertingActions(const ColumnsWithTypeAndName & left_sample_co
}
template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
bool TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right)
void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right)
{
if (strictness() == ASTTableJoin::Strictness::Asof)
{
if (clauses.size() != 1)
throw DB::Exception("ASOF join over multiple keys is not supported", ErrorCodes::NOT_IMPLEMENTED);
if (right.back().type->isNullable())
throw DB::Exception("ASOF join over right table Nullable column is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
if (!left_type_map.empty() || !right_type_map.empty())
return true;
return;
NameToTypeMap left_types;
for (const auto & col : left)
@ -544,8 +580,6 @@ bool TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const Rig
formatTypeMap(left_type_map, left_types),
formatTypeMap(right_type_map, right_types));
}
return !left_type_map.empty();
}
@ -574,7 +608,9 @@ static ActionsDAGPtr makeConvertingDag(
}
ActionsDAGPtr TableJoin::applyKeyConvertToTable(
const ColumnsWithTypeAndName & cols_src, const NameToTypeMap & type_mapping, NameToNameMap & key_column_rename,
const ColumnsWithTypeAndName & cols_src,
const NameToTypeMap & type_mapping,
NameToNameMap & key_column_rename,
bool make_nullable) const
{
auto dag1 = makeConvertingDag(
@ -603,9 +639,7 @@ ActionsDAGPtr TableJoin::applyKeyConvertToTable(
{
if (added_cols.contains(col.name) && !hasUsing())
return false;
if (!col.type->canBeInsideNullable())
return false;
col.type = makeNullable(col.type);
col.type = JoinCommon::convertTypeToNullable(col.type);
return true;
},
nullptr, false);

View File

@ -161,7 +161,7 @@ private:
/// Calculates common supertypes for corresponding join key columns.
template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
bool inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right);
void inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right);
NamesAndTypesList correctedColumnsAddedByJoin() const;

View File

@ -119,6 +119,9 @@ bool canBecomeNullable(const DataTypePtr & type)
/// Note: LowCardinality(T) transformed to LowCardinality(Nullable(T))
DataTypePtr convertTypeToNullable(const DataTypePtr & type)
{
if (type->isNullable())
return type;
if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get()))
{
const auto & dict_type = low_cardinality_type->getDictionaryType();
@ -425,10 +428,13 @@ void checkTypesOfKeys(const Block & block_left, const Names & key_names_left,
DataTypePtr right_type = removeNullable(recursiveRemoveLowCardinality(block_right.getByName(key_names_right[i]).type));
if (!left_type->equals(*right_type))
throw Exception("Type mismatch of columns to JOIN by: "
+ key_names_left[i] + " " + left_type->getName() + " at left, "
+ key_names_right[i] + " " + right_type->getName() + " at right",
ErrorCodes::TYPE_MISMATCH);
{
throw DB::Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch of columns to JOIN by: {} {} at left, {} {} at right",
key_names_left[i], left_type->getName(),
key_names_right[i], right_type->getName());
}
}
}

View File

@ -14,6 +14,7 @@
#include <Interpreters/castColumn.h>
#include <Common/quoteString.h>
#include <Common/Exception.h>
#include <Interpreters/join_common.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Processors/Sources/SourceWithProgress.h>
@ -64,7 +65,7 @@ StorageJoin::StorageJoin(
throw Exception{"Key column (" + key + ") does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
table_join = std::make_shared<TableJoin>(limits, use_nulls, kind, strictness, key_names);
join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
join = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
restore();
}
@ -82,7 +83,7 @@ SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataP
return StorageSetOrJoinBase::write(query, metadata_snapshot, context);
}
void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, TableExclusiveLockHolder &)
void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context, TableExclusiveLockHolder &)
{
std::lock_guard mutate_lock(mutate_mutex);
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
@ -92,7 +93,7 @@ void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_s
disk->createDirectories(path + "tmp/");
increment = 0;
join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
join = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
}
void StorageJoin::checkMutationIsPossible(const MutationCommands & commands, const Settings & /* settings */) const
@ -116,7 +117,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
auto compressed_backup_buf = CompressedWriteBuffer(*backup_buf);
auto backup_stream = NativeWriter(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock());
auto new_data = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
auto new_data = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
// New scope controls lifetime of InputStream.
{
@ -167,8 +168,10 @@ HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join,
if ((analyzed_join->forceNullableRight() && !use_nulls) ||
(!analyzed_join->forceNullableRight() && isLeftOrFull(analyzed_join->kind()) && use_nulls))
throw Exception("Table " + getStorageID().getNameForLogs() + " needs the same join_use_nulls setting as present in LEFT or FULL JOIN.",
ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
throw Exception(
ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN,
"Table {} needs the same join_use_nulls setting as present in LEFT or FULL JOIN",
getStorageID().getNameForLogs());
/// TODO: check key columns
@ -177,7 +180,7 @@ HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join,
/// Qualifies will be added by join implementation (HashJoin)
analyzed_join->setRightKeys(key_names);
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, metadata_snapshot->getSampleBlock().sortColumns());
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, getRightSampleBlock());
RWLockImpl::LockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
join_clone->setLock(holder);

View File

@ -6,6 +6,7 @@
#include <Storages/StorageSet.h>
#include <Storages/TableLockHolder.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/join_common.h>
namespace DB
@ -60,6 +61,22 @@ public:
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
Block getRightSampleBlock() const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
Block block = metadata_snapshot->getSampleBlock().sortColumns();
if (use_nulls && isLeftOrFull(kind))
{
for (auto & col : block)
{
JoinCommon::convertColumnToNullable(col);
}
}
return block;
}
bool useNulls() const { return use_nulls; }
private:
Block sample_block;
const Names key_names;

View File

@ -18,13 +18,13 @@ SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(ma
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b
USING(pk, dt)
ORDER BY a.dt;-- { serverError 48 }
ORDER BY a.dt; -- { serverError 48 }
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b
USING(pk, dt)
ORDER BY a.dt;-- { serverError 48 }
ORDER BY a.dt; -- { serverError 48 }
select 'left asof on';
@ -44,13 +44,13 @@ SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(ma
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;-- { serverError 48 }
ORDER BY a.dt; -- { serverError 48 }
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF LEFT JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;-- { serverError 48 }
ON a.dt >= b.dt AND a.pk = b.pk
ORDER BY a.dt; -- { serverError 48 }
select 'asof using';
@ -70,13 +70,13 @@ SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(ma
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
USING(pk, dt)
ORDER BY a.dt;-- { serverError 48 }
ORDER BY a.dt; -- { serverError 48 }
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
USING(pk, dt)
ORDER BY a.dt;-- { serverError 48 }
ORDER BY a.dt; -- { serverError 48 }
select 'asof on';
@ -96,10 +96,16 @@ SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(ma
FROM (SELECT toUInt8(number) > 0 as pk, toUInt8(number) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;-- { serverError 48 }
ORDER BY a.dt; -- { serverError 48 }
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.pk = b.pk AND a.dt >= b.dt
ORDER BY a.dt;-- { serverError 48 }
ORDER BY a.dt; -- { serverError 48 }
SELECT a.pk, b.pk, a.dt, b.dt, toTypeName(a.pk), toTypeName(b.pk), toTypeName(materialize(a.dt)), toTypeName(materialize(b.dt))
FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM numbers(3)) a
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.dt >= b.dt AND a.pk = b.pk
ORDER BY a.dt; -- { serverError 48 }