include the asof column in the block stored. overall not working yet but at least doesnt crash anymore

This commit is contained in:
Martijn Bakker 2019-03-31 20:03:57 +01:00
parent 11997ed772
commit 4a7393cfc6
3 changed files with 53 additions and 6 deletions

View File

@ -534,8 +534,15 @@ bool Join::insertFromBlock(const Block & block)
NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section
/// Remove the key columns from stored_block, as they are not needed.
/// However, do not erase the ASOF column if this is an asof join
for (const auto & name : key_names_right)
{
if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back())
{
LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name);
break; // this is the last column so break is OK
}
if (!erased.count(name))
stored_block->erase(stored_block->getPositionByName(name));
erased.insert(name);
@ -606,8 +613,13 @@ public:
void appendFromBlock(const Block & block, size_t row_num)
{
for (size_t j = 0; j < right_indexes.size(); ++j)
std::cout << "appendFromBlock block=" << block.dumpStructure() << " row_num=" << row_num << std::endl;
for (size_t j = 0; j < right_indexes.size(); ++j) {
std::cout << "right_index=" << right_indexes[j] << std::endl;
columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num);
}
}
@ -616,19 +628,25 @@ public:
for (size_t j = 0; j < right_indexes.size(); ++j)
columns[j]->insertDefault();
}
void addExtraColumn(const ColumnWithTypeAndName & src_column)
{
addColumn(src_column, columns.size());
}
private:
TypeAndNames type_name;
MutableColumns columns;
std::vector<size_t> right_indexes;
void addColumn(const ColumnWithTypeAndName & src_column, size_t idx)
{
std::cout << "adding column from src=" << src_column.dumpStructure() << " idx=" << idx << std::endl;
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name);
right_indexes.push_back(idx);
}
TypeAndNames type_name;
MutableColumns columns;
std::vector<size_t> right_indexes;
};
template <ASTTableJoin::Strictness STRICTNESS, typename Map>
@ -825,17 +843,26 @@ void Join::joinBlockImpl(
num_columns_to_skip = keys_size;
/// Add new columns to the block.
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, num_columns_to_skip);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
// Add the last key column which is the ASOF key
added.addExtraColumn(sample_block_with_keys.safeGetByPosition(sample_block_with_keys.columns()-1));
}
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(
type, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate);
LOG_DEBUG(log, "joinBlockImpl - switchJoinRightColumns");
for (size_t i = 0; i < added.size(); ++i)
block.insert(added.moveColumn(i));
LOG_DEBUG(log, "joinBlockImpl - after insert: " << block.dumpStructure());
/// Filter & insert missing rows
auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join);

View File

@ -0,0 +1,5 @@
1 101 1 0 0 0
1 102 2 2 102 1
1 103 3 2 102 1
1 104 4 4 104 1
1 105 5 4 104 1

View File

@ -0,0 +1,15 @@
USE test;
DROP TABLE IF EXISTS A;
DROP TABLE IF EXISTS B;
CREATE TABLE A(k UInt32, t UInt32, a UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO A(k,t,a) VALUES (1,101,1),(1,102,2),(1,103,3),(1,104,4),(1,105,5);
CREATE TABLE B(k UInt32, t UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4);
SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
DROP TABLE A;
DROP TABLE B;