revrite NonJoinedBlockInputStream (in progress)

This commit is contained in:
chertus 2019-11-01 20:41:07 +03:00
parent 0f68c5a12a
commit 334b91f351
5 changed files with 324 additions and 229 deletions

View File

@ -104,13 +104,23 @@ static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column,
return std::move(column); return std::move(column);
} }
static void changeNullability(MutableColumnPtr & mutable_column)
{
ColumnPtr column = std::move(mutable_column);
if (auto * nullable = checkAndGetColumn<ColumnNullable>(*column))
column = nullable->getNestedColumnPtr();
else
column = makeNullable(column);
mutable_column = (*std::move(column)).mutate();
}
Join::Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_) Join::Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_)
: table_join(table_join_) : table_join(table_join_)
, kind(table_join->kind()) , kind(table_join->kind())
, strictness(table_join->strictness()) , strictness(table_join->strictness())
, key_names_right(table_join->keyNamesRight()) , key_names_right(table_join->keyNamesRight())
, required_right_keys(table_join->requiredRightKeys())
, nullable_right_side(table_join->forceNullableRight()) , nullable_right_side(table_join->forceNullableRight())
, nullable_left_side(table_join->forceNullableLeft()) , nullable_left_side(table_join->forceNullableLeft())
, any_take_last_row(any_take_last_row_) , any_take_last_row(any_take_last_row_)
@ -305,6 +315,13 @@ void Join::setSampleBlock(const Block & block)
ColumnRawPtrs key_columns = JoinCommon::extractKeysForJoin(key_names_right, block, right_table_keys, sample_block_with_columns_to_add); ColumnRawPtrs key_columns = JoinCommon::extractKeysForJoin(key_names_right, block, right_table_keys, sample_block_with_columns_to_add);
initRightBlockStructure();
initRequiredRightKeys();
JoinCommon::createMissedColumns(sample_block_with_columns_to_add);
if (nullable_right_side)
JoinCommon::convertColumnsToNullable(sample_block_with_columns_to_add);
if (strictness == ASTTableJoin::Strictness::Asof) if (strictness == ASTTableJoin::Strictness::Asof)
{ {
if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner) if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner)
@ -338,14 +355,6 @@ void Join::setSampleBlock(const Block & block)
/// Choose data structure to use for JOIN. /// Choose data structure to use for JOIN.
init(chooseMethod(key_columns, key_sizes)); init(chooseMethod(key_columns, key_sizes));
} }
blocklist_sample = Block(block.getColumnsWithTypeAndName());
prepareBlockListStructure(blocklist_sample);
JoinCommon::createMissedColumns(sample_block_with_columns_to_add);
if (nullable_right_side)
JoinCommon::convertColumnsToNullable(sample_block_with_columns_to_add);
} }
namespace namespace
@ -459,39 +468,57 @@ namespace
} }
} }
void Join::prepareBlockListStructure(Block & stored_block) void Join::initRequiredRightKeys()
{
const Names & left_keys = table_join->keyNamesLeft();
const Names & right_keys = table_join->keyNamesRight();
NameSet required_keys(table_join->requiredRightKeys().begin(), table_join->requiredRightKeys().end());
for (size_t i = 0; i < right_keys.size(); ++i)
{
const String & right_key_name = right_keys[i];
if (required_keys.count(right_key_name) && !required_right_keys.has(right_key_name))
{
const auto & right_key = right_table_keys.getByName(right_key_name);
required_right_keys.insert(right_key);
required_right_keys_sources.push_back(left_keys[i]);
}
}
}
void Join::initRightBlockStructure()
{ {
if (isRightOrFull(kind)) if (isRightOrFull(kind))
{ {
/** Move the key columns to the beginning of the block. /// Save keys for NonJoinedBlockInputStream
* This is where NonJoinedBlockInputStream will expect. saved_block_sample = right_table_keys.cloneEmpty();
*/
size_t key_num = 0;
for (const auto & name : key_names_right)
{
size_t pos = stored_block.getPositionByName(name);
ColumnWithTypeAndName col = stored_block.safeGetByPosition(pos);
stored_block.erase(pos);
stored_block.insert(key_num, std::move(col));
++key_num;
}
} }
else else if (strictness == ASTTableJoin::Strictness::Asof)
{ {
NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section /// Save ASOF key
saved_block_sample.insert(right_table_keys.safeGetByPosition(right_table_keys.columns() - 1));
/// Remove the key columns from stored_block, as they are not needed.
/// However, do not erase the ASOF column if this is an asof join
for (const auto & name : key_names_right)
{
if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back())
break; // this is the last column so break is OK
if (!erased.count(name))
stored_block.erase(stored_block.getPositionByName(name));
erased.insert(name);
}
} }
/// Save non key columns
for (auto & column : sample_block_with_columns_to_add)
saved_block_sample.insert(column);
}
Block * Join::storeRightBlock(const Block & block)
{
Block structured_block;
for (auto & columns : saved_block_sample.getColumnsWithTypeAndName())
structured_block.insert(block.getByName(columns.name));
/// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them.
materializeBlockInplace(structured_block);
if (nullable_right_side)
JoinCommon::convertColumnsToNullable(structured_block, (isFull(kind) ? right_table_keys.columns() : 0));
blocks.push_back(structured_block);
return &blocks.back();
} }
bool Join::addJoinedBlock(const Block & block) bool Join::addJoinedBlock(const Block & block)
@ -510,20 +537,10 @@ bool Join::addJoinedBlock(const Block & block)
ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map);
size_t rows = block.rows(); size_t rows = block.rows();
if (rows) if (rows)
has_no_rows_in_maps = false; has_no_rows_in_maps = false;
blocks.push_back(block); Block * stored_block = storeRightBlock(block);
Block * stored_block = &blocks.back();
prepareBlockListStructure(*stored_block);
/// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them.
materializeBlockInplace(*stored_block);
if (nullable_right_side)
JoinCommon::convertColumnsToNullable(*stored_block, (isFull(kind) ? key_names_right.size() : 0));
if (kind != ASTTableJoin::Kind::Cross) if (kind != ASTTableJoin::Kind::Cross)
{ {
@ -559,7 +576,7 @@ public:
AddedColumns(const Block & sample_block_with_columns_to_add, AddedColumns(const Block & sample_block_with_columns_to_add,
const Block & block_with_columns_to_add, const Block & block_with_columns_to_add,
const Block & block, const Block & block,
const Block & blocklist_sample, const Block & saved_block_sample,
const ColumnsWithTypeAndName & extras) const ColumnsWithTypeAndName & extras)
{ {
size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
@ -581,7 +598,7 @@ public:
addColumn(extra); addColumn(extra);
for (auto & tn : type_name) for (auto & tn : type_name)
right_indexes.push_back(blocklist_sample.getPositionByName(tn.second)); right_indexes.push_back(saved_block_sample.getPositionByName(tn.second));
} }
size_t size() const { return columns.size(); } size_t size() const { return columns.size(); }
@ -791,7 +808,7 @@ void Join::joinBlockImpl(
ColumnsWithTypeAndName extras; ColumnsWithTypeAndName extras;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
extras.push_back(right_table_keys.getByName(key_names_right.back())); extras.push_back(right_table_keys.getByName(key_names_right.back()));
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, blocklist_sample, extras); AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, saved_block_sample, extras);
std::unique_ptr<IColumn::Offsets> offsets_to_replicate; std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
@ -814,17 +831,14 @@ void Join::joinBlockImpl(
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(row_filter, -1); block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(row_filter, -1);
/// Add join key columns from right block if needed. /// Add join key columns from right block if needed.
for (size_t i = 0; i < right_table_keys.columns(); ++i) for (size_t i = 0; i < required_right_keys.columns(); ++i)
{ {
const auto & right_key = right_table_keys.getByPosition(i); const auto & right_key = required_right_keys.getByPosition(i);
auto & left_name = key_names_left[i]; const auto & left_name = required_right_keys_sources[i];
if (required_right_keys.count(right_key.name) && !block.has(right_key.name)) const auto & col = block.getByName(left_name);
{ bool is_nullable = nullable_right_side || right_key.type->isNullable();
const auto & col = block.getByName(left_name); block.insert(correctNullability({col.column, col.type, right_key.name}, is_nullable));
bool is_nullable = nullable_right_side || right_key.type->isNullable();
block.insert(correctNullability({col.column, col.type, right_key.name}, is_nullable));
}
} }
} }
else else
@ -836,22 +850,19 @@ void Join::joinBlockImpl(
const IColumn::Filter & filter = null_map_filter.getData(); const IColumn::Filter & filter = null_map_filter.getData();
/// Add join key columns from right block if needed. /// Add join key columns from right block if needed.
for (size_t i = 0; i < right_table_keys.columns(); ++i) for (size_t i = 0; i < required_right_keys.columns(); ++i)
{ {
const auto & right_key = right_table_keys.getByPosition(i); const auto & right_key = required_right_keys.getByPosition(i);
auto & left_name = key_names_left[i]; const auto & left_name = required_right_keys_sources[i];
if (required_right_keys.count(right_key.name) && !block.has(right_key.name)) const auto & col = block.getByName(left_name);
{ bool is_nullable = nullable_right_side || right_key.type->isNullable();
const auto & col = block.getByName(left_name);
bool is_nullable = nullable_right_side || right_key.type->isNullable();
ColumnPtr thin_column = filterWithBlanks(col.column, filter); ColumnPtr thin_column = filterWithBlanks(col.column, filter);
block.insert(correctNullability({thin_column, col.type, right_key.name}, is_nullable, null_map_filter)); block.insert(correctNullability({thin_column, col.type, right_key.name}, is_nullable, null_map_filter));
if constexpr (is_all_join) if constexpr (is_all_join)
right_keys_to_replicate.push_back(block.getPositionByName(right_key.name)); right_keys_to_replicate.push_back(block.getPositionByName(right_key.name));
}
} }
} }
@ -1012,10 +1023,6 @@ struct AdderNonJoined<ASTTableJoin::Strictness::Any, Mapped>
for (size_t j = 0; j < columns_right.size(); ++j) for (size_t j = 0; j < columns_right.size(); ++j)
{ {
const auto & mapped_column = mapped.block->getByPosition(j).column; const auto & mapped_column = mapped.block->getByPosition(j).column;
#ifndef NDEBUG
if (columns_right[j]->isNullable() != mapped_column->isNullable())
throw Exception("Wrong columns nullability", ErrorCodes::LOGICAL_ERROR);
#endif
columns_right[j]->insertFrom(*mapped_column, mapped.row_num); columns_right[j]->insertFrom(*mapped_column, mapped.row_num);
} }
@ -1033,10 +1040,6 @@ struct AdderNonJoined<ASTTableJoin::Strictness::All, Mapped>
for (size_t j = 0; j < columns_right.size(); ++j) for (size_t j = 0; j < columns_right.size(); ++j)
{ {
const auto & mapped_column = it->block->getByPosition(j).column; const auto & mapped_column = it->block->getByPosition(j).column;
#ifndef NDEBUG
if (columns_right[j]->isNullable() != mapped_column->isNullable())
throw Exception("Wrong columns nullability", ErrorCodes::LOGICAL_ERROR);
#endif
columns_right[j]->insertFrom(*mapped_column, it->row_num); columns_right[j]->insertFrom(*mapped_column, it->row_num);
} }
@ -1062,61 +1065,55 @@ public:
: parent(parent_) : parent(parent_)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
{ {
const Names & key_names_left = parent_.table_join->keyNamesLeft(); /// Left or right keys map. In case of collisions it contains any right_key that has data for left one.
std::unordered_map<size_t, size_t> left_to_right_key_position;
/** left_sample_block contains keys and "left" columns. for (size_t i = 0; i < parent.table_join->keyNamesLeft().size(); ++i)
* result_sample_block - keys, "left" columns, and "right" columns.
*/
std::vector<bool> is_left_key(left_sample_block.columns(), false);
std::vector<size_t> key_positions_left;
key_positions_left.reserve(key_names_left.size());
for (const std::string & key : key_names_left)
{ {
size_t key_pos = left_sample_block.getPositionByName(key); const String & left_key_name = parent.table_join->keyNamesLeft()[i];
key_positions_left.push_back(key_pos); const String & right_key_name = parent.table_join->keyNamesRight()[i];
is_left_key[key_pos] = true;
size_t left_key_pos = left_sample_block.getPositionByName(left_key_name);
size_t right_key_pos = parent.saved_block_sample.getPositionByName(right_key_name);
left_to_right_key_position[left_key_pos] = right_key_pos;
} }
const Block & right_sample_block = parent.sample_block_with_columns_to_add; makeResultSampleBlock(left_sample_block);
std::unordered_map<size_t, size_t> left_to_right_key_map; bool join_using = parent.table_join->hasUsing();
makeResultSampleBlock(left_sample_block, right_sample_block, key_positions_left, left_to_right_key_map); for (size_t left_pos = 0; left_pos < left_sample_block.columns(); ++left_pos)
auto nullability_changes = getNullabilityChanges(parent.right_table_keys, result_sample_block,
key_positions_left, left_to_right_key_map);
column_indices_left.reserve(left_sample_block.columns() - key_names_left.size());
column_indices_keys_and_right.reserve(key_names_left.size() + right_sample_block.columns());
key_nullability_changes.reserve(key_positions_left.size());
/// Use right key columns if present. @note left & right key columns could have different nullability.
for (size_t key_pos : key_positions_left)
{ {
/// Here we establish the mapping between key columns of the left- and right-side tables. /// We need right 'x' for 'RIGHT JOIN ... USING(x)'.
/// key_pos index is inserted in the position corresponding to key column in parent.blocks if (join_using && left_to_right_key_position.count(left_pos))
/// (saved blocks of the right-side table) and points to the same key column
/// in the left_sample_block and thus in the result_sample_block.
auto it = left_to_right_key_map.find(key_pos);
if (it != left_to_right_key_map.end())
{ {
column_indices_left.push_back(key_pos); size_t right_key_pos = left_to_right_key_position[left_pos];
key_pos = it->second; setRightIndex(right_key_pos, left_pos);
} }
else
column_indices_keys_and_right.push_back(key_pos); column_indices_left.emplace_back(left_pos);
key_nullability_changes.push_back(nullability_changes.count(key_pos));
} }
for (size_t i = 0; i < left_sample_block.columns(); ++i) for (size_t right_pos = 0; right_pos < parent.saved_block_sample.columns(); ++right_pos)
if (!is_left_key[i]) {
column_indices_left.emplace_back(i); const String & name = parent.saved_block_sample.getByPosition(right_pos).name;
if (!result_sample_block.has(name))
continue;
size_t num_additional_keys = left_to_right_key_map.size(); size_t result_position = result_sample_block.getPositionByName(name);
for (size_t i = left_sample_block.columns(); i < result_sample_block.columns() - num_additional_keys; ++i)
column_indices_keys_and_right.emplace_back(i); /// Don't remap left keys twice. We need only qualified right keys here
if (result_position < left_sample_block.columns())
continue;
setRightIndex(right_pos, result_position);
}
if (column_indices_left.size() + column_indices_right.size() + same_result_keys.size() != result_sample_block.columns())
throw Exception("Error in columns mapping in RIGHT|FULL JOIN. Left: " + toString(column_indices_left.size()) +
", right: " + toString(column_indices_right.size()) +
", same: " + toString(same_result_keys.size()) +
", result: " + toString(result_sample_block.columns()),
ErrorCodes::LOGICAL_ERROR);
} }
String getName() const override { return "NonJoined"; } String getName() const override { return "NonJoined"; }
@ -1137,104 +1134,118 @@ private:
UInt64 max_block_size; UInt64 max_block_size;
Block result_sample_block; Block result_sample_block;
/// Indices of columns in result_sample_block that come from the left-side table (except shared right+left key columns). /// Indices of columns in result_sample_block that come from the left-side table: left_pos == result_pos
ColumnNumbers column_indices_left; std::vector<size_t> column_indices_left;
/// Indices of key columns in result_sample_block or columns that come from the right-side table. /// Indices of columns that come from the right-side table: right_pos -> result_pos
/// Order is significant: it is the same as the order of columns in the blocks of the right-side table that are saved in parent.blocks. std::unordered_map<size_t, size_t> column_indices_right;
ColumnNumbers column_indices_keys_and_right; ///
/// Which key columns need change nullability (right is nullable and left is not or vice versa) std::unordered_map<size_t, size_t> same_result_keys;
std::vector<bool> key_nullability_changes; /// Which right columns (saved in parent) need nullability change before placing them in result block
std::vector<size_t> right_nullability_changes;
std::any position; std::any position;
std::optional<Join::BlockNullmapList::const_iterator> nulls_position; std::optional<Join::BlockNullmapList::const_iterator> nulls_position;
void makeResultSampleBlock(const Block & left_sample_block, const Block & right_sample_block, /// result_sample_block: "left keys", "left" columns, "right" columns, some "right keys"
const std::vector<size_t> & key_positions_left, void makeResultSampleBlock(const Block & left_sample_block)
std::unordered_map<size_t, size_t> & left_to_right_key_map)
{ {
result_sample_block = materializeBlock(left_sample_block); result_sample_block = materializeBlock(left_sample_block);
if (parent.nullable_left_side) if (parent.nullable_left_side)
JoinCommon::convertColumnsToNullable(result_sample_block); JoinCommon::convertColumnsToNullable(result_sample_block);
/// Add columns from the right-side table to the block. for (const ColumnWithTypeAndName & src_column : parent.sample_block_with_columns_to_add)
for (size_t i = 0; i < right_sample_block.columns(); ++i)
{ {
const ColumnWithTypeAndName & src_column = right_sample_block.getByPosition(i);
if (!result_sample_block.has(src_column.name)) if (!result_sample_block.has(src_column.name))
result_sample_block.insert(src_column.cloneEmpty()); result_sample_block.insert(src_column.cloneEmpty());
} }
/// Add join key columns from right block if they has different name. for (auto & required_key : parent.required_right_keys)
for (size_t i = 0; i < parent.right_table_keys.columns(); ++i)
{ {
const auto & right_key = parent.right_table_keys.getByPosition(i); const auto & right_key = parent.saved_block_sample.getByName(required_key.name);
size_t left_key_pos = key_positions_left[i];
if (parent.required_right_keys.count(right_key.name) && !result_sample_block.has(right_key.name)) bool is_nullable = (parent.nullable_right_side && isFull(parent.kind)) || right_key.column->isNullable();
{ result_sample_block.insert(correctNullability({right_key.column, right_key.type, right_key.name}, is_nullable));
const auto & col = result_sample_block.getByPosition(left_key_pos);
bool is_nullable = (parent.nullable_right_side && isFull(parent.kind)) || right_key.type->isNullable();
result_sample_block.insert(correctNullability({col.column, col.type, right_key.name}, is_nullable));
size_t right_key_pos = result_sample_block.getPositionByName(right_key.name);
left_to_right_key_map[left_key_pos] = right_key_pos;
}
} }
} }
void setRightIndex(size_t right_pos, size_t result_position)
{
if (!column_indices_right.count(right_pos))
{
column_indices_right[right_pos] = result_position;
if (hasNullabilityChange(right_pos, result_position))
right_nullability_changes.push_back(right_pos);
}
else
same_result_keys[result_position] = column_indices_right[right_pos];
}
bool hasNullabilityChange(size_t right_pos, size_t result_pos) const
{
const auto & src = parent.saved_block_sample.getByPosition(right_pos).column;
const auto & dst = result_sample_block.getByPosition(result_pos).column;
return src->isNullable() != dst->isNullable();
}
Block createBlock() Block createBlock()
{ {
MutableColumns columns_left = columnsForIndex(result_sample_block, column_indices_left); MutableColumns columns_right = parent.saved_block_sample.cloneEmptyColumns();
MutableColumns columns_keys_and_right = columnsForIndex(result_sample_block, column_indices_keys_and_right);
/// Temporary change destination key columns' nullability according to mapped block
changeNullability(columns_keys_and_right, key_nullability_changes);
size_t rows_added = 0; size_t rows_added = 0;
auto fill_callback = [&](auto, auto strictness, auto & map) auto fill_callback = [&](auto, auto strictness, auto & map)
{ {
rows_added = fillColumnsFromMap<strictness>(map, columns_keys_and_right); rows_added = fillColumnsFromMap<strictness>(map, columns_right);
}; };
if (!joinDispatch(parent.kind, parent.strictness, parent.maps, fill_callback)) if (!joinDispatch(parent.kind, parent.strictness, parent.maps, fill_callback))
throw Exception("Logical error: unknown JOIN strictness (must be on of: ANY, ALL, ASOF)", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: unknown JOIN strictness (must be on of: ANY, ALL, ASOF)", ErrorCodes::LOGICAL_ERROR);
fillNullsFromBlocks(columns_keys_and_right, rows_added); fillNullsFromBlocks(columns_right, rows_added);
if (!rows_added) if (!rows_added)
return {}; return {};
/// Revert columns nullability for (size_t pos : right_nullability_changes)
changeNullability(columns_keys_and_right, key_nullability_changes); changeNullability(columns_right[pos]);
Block res = result_sample_block.cloneEmpty(); Block res = result_sample_block.cloneEmpty();
/// @note it's possible to make ColumnConst here and materialize it later /// @note it's possible to make ColumnConst here and materialize it later
for (size_t i = 0; i < columns_left.size(); ++i) for (size_t pos : column_indices_left)
res.getByPosition(column_indices_left[i]).column = columns_left[i]->cloneResized(rows_added); res.getByPosition(pos).column = res.getByPosition(pos).column->cloneResized(rows_added);
for (size_t i = 0; i < columns_keys_and_right.size(); ++i) for (auto & pr : column_indices_right)
res.getByPosition(column_indices_keys_and_right[i]).column = std::move(columns_keys_and_right[i]);
return res;
}
static MutableColumns columnsForIndex(const Block & block, const ColumnNumbers & indices)
{
size_t num_columns = indices.size();
MutableColumns columns;
columns.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{ {
const auto & src_col = block.safeGetByPosition(indices[i]); auto & right_column = columns_right[pr.first];
columns[i] = src_col.type->createColumn(); auto & result_column = res.getByPosition(pr.second).column;
#ifndef NDEBUG
if (result_column->getName() != right_column->getName())
throw Exception("Wrong columns assign in RIGHT|FULL JOIN: " + result_column->getName() +
" " + right_column->getName(), ErrorCodes::LOGICAL_ERROR);
#endif
result_column = std::move(right_column);
} }
return columns; for (auto & pr : same_result_keys)
{
auto & src_column = res.getByPosition(pr.second).column;
auto & dst_column = res.getByPosition(pr.first).column;
if (src_column->isNullable() && !dst_column->isNullable())
{
auto * nullable = checkAndGetColumn<ColumnNullable>(*src_column);
dst_column = nullable->getNestedColumnPtr();
}
else if (!src_column->isNullable() && dst_column->isNullable())
dst_column = makeNullable(src_column);
else
dst_column = src_column;
}
return res;
} }
template <ASTTableJoin::Strictness STRICTNESS, typename Maps> template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
@ -1310,47 +1321,6 @@ private:
} }
} }
} }
static std::unordered_set<size_t> getNullabilityChanges(const Block & right_table_keys, const Block & out_block,
const std::vector<size_t> & key_positions,
const std::unordered_map<size_t, size_t> & left_to_right_key_map)
{
std::unordered_set<size_t> nullability_changes;
for (size_t i = 0; i < key_positions.size(); ++i)
{
size_t key_pos = key_positions[i];
auto it = left_to_right_key_map.find(key_pos);
if (it != left_to_right_key_map.end())
key_pos = it->second;
const auto & dst = out_block.getByPosition(key_pos).column;
const auto & src = right_table_keys.getByPosition(i).column;
if (dst->isNullable() != src->isNullable())
nullability_changes.insert(key_pos);
}
return nullability_changes;
}
static void changeNullability(MutableColumns & columns, const std::vector<bool> & changes_bitmap)
{
/// @note changes_bitmap.size() <= columns.size()
for (size_t i = 0; i < changes_bitmap.size(); ++i)
{
if (changes_bitmap[i])
{
ColumnPtr column = std::move(columns[i]);
if (auto * nullable = checkAndGetColumn<ColumnNullable>(*column))
column = nullable->getNestedColumnPtr();
else
column = makeNullable(column);
columns[i] = (*std::move(column)).mutate();
}
}
}
}; };

View File

@ -283,8 +283,6 @@ private:
/// Names of key columns in right-side table (in the order they appear in ON/USING clause). @note It could contain duplicates. /// Names of key columns in right-side table (in the order they appear in ON/USING clause). @note It could contain duplicates.
const Names & key_names_right; const Names & key_names_right;
/// Names right-side table keys that are needed in result (would be attached after joined columns).
const NameSet required_right_keys;
/// In case of LEFT and FULL joins, if use_nulls, convert right-side columns to Nullable. /// In case of LEFT and FULL joins, if use_nulls, convert right-side columns to Nullable.
bool nullable_right_side; bool nullable_right_side;
@ -319,9 +317,13 @@ private:
Block sample_block_with_columns_to_add; Block sample_block_with_columns_to_add;
/// Block with key columns in the same order they appear in the right-side table (duplicates appear once). /// Block with key columns in the same order they appear in the right-side table (duplicates appear once).
Block right_table_keys; Block right_table_keys;
/// Block with key columns right-side table keys that are needed in result (would be attached after joined columns).
Block required_right_keys;
/// Left table column names that are sources for required_right_keys columns
std::vector<String> required_right_keys_sources;
/// Block as it would appear in the BlockList /// Block as it would appear in the BlockList
Block blocklist_sample; Block saved_block_sample;
Poco::Logger * log; Poco::Logger * log;
@ -340,10 +342,10 @@ private:
*/ */
void setSampleBlock(const Block & block); void setSampleBlock(const Block & block);
/** Take an inserted block and discard everything that does not need to be stored /// Modify (structure) and save right block, @returns pointer to saved block
* Example, remove the keys as they come from the LHS block, but do keep the ASOF timestamps Block * storeRightBlock(const Block & stored_block);
*/ void initRightBlockStructure();
void prepareBlockListStructure(Block & stored_block); void initRequiredRightKeys();
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
void joinBlockImpl( void joinBlockImpl(

View File

@ -1,5 +1,5 @@
2015-12-01 0 0 2015-12-01 0 0
2015-12-02 1 1 2015-12-02 1 1
2015-12-03 0 2 0000-00-00 0 2
2015-12-04 0 3 0000-00-00 0 3
2015-12-05 0 4 0000-00-00 0 4

View File

@ -0,0 +1,60 @@
1 2 0 0
0 0 1 2
-
0 0 1 2
-
1 2 0 0
0 0 1 2
-
0 0 1 2
-
1
0
-
0
-
1
0
-
0
-
0
1
-
1
-
0
1
-
1
-
1 2 \N \N
\N \N 8 2
-
\N \N 1 2
-
1 2 \N \N
\N \N 1 2
-
\N \N 1 2
-
1
\N
-
\N
-
1
\N
-
\N
-
\N
8
-
1
-
\N
1
-
1
-

View File

@ -0,0 +1,63 @@
SET join_use_nulls = 0;
SELECT * FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT * FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT * FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT * FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT foo.a FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT foo.a FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT foo.a FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT foo.a FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT bar.a FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT bar.a FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT bar.a FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT bar.a FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SET join_use_nulls = 1;
SELECT * FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT * FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT * FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT * FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT foo.a FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT foo.a FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT foo.a FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT foo.a FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT bar.a FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT bar.a FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.a = bar.b) AND (foo.b = bar.b);
SELECT '-';
SELECT bar.a FROM (SELECT 1 AS a, 2 AS b) AS foo FULL JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';
SELECT bar.a FROM (SELECT 1 AS a, 2 AS b) AS foo RIGHT JOIN (SELECT 1 AS a, 2 AS b) AS bar ON (foo.b = bar.a) AND (foo.b = bar.b);
SELECT '-';