Merge pull request #4867 from Gladdy/martijn-asof-working-bt

Return the correct timestamp for the right-hand table of the ASOF join
This commit is contained in:
Artem Zuikov 2019-04-01 15:36:27 +03:00 committed by GitHub
commit b6b9be7b8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 119 additions and 51 deletions

View File

@ -331,6 +331,9 @@ void Join::setSampleBlock(const Block & block)
sample_block_with_columns_to_add = materializeBlock(block);
blocklist_sample = Block(block.getColumnsWithTypeAndName());
prepareBlockListStructure(blocklist_sample);
/// Move from `sample_block_with_columns_to_add` key columns to `sample_block_with_keys`, keeping the order.
size_t pos = 0;
while (pos < sample_block_with_columns_to_add.columns())
@ -482,10 +485,47 @@ namespace
}
}
void Join::prepareBlockListStructure(Block & stored_block)
{
if (isRightOrFull(kind))
{
/** Move the key columns to the beginning of the block.
* This is where NonJoinedBlockInputStream will expect.
*/
size_t key_num = 0;
for (const auto & name : key_names_right)
{
size_t pos = stored_block.getPositionByName(name);
ColumnWithTypeAndName col = stored_block.safeGetByPosition(pos);
stored_block.erase(pos);
stored_block.insert(key_num, std::move(col));
++key_num;
}
}
else
{
NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section
/// Remove the key columns from stored_block, as they are not needed.
/// However, do not erase the ASOF column if this is an asof join
for (const auto & name : key_names_right)
{
if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back())
{
LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name);
break; // this is the last column so break is OK
}
if (!erased.count(name))
stored_block.erase(stored_block.getPositionByName(name));
erased.insert(name);
}
}
}
bool Join::insertFromBlock(const Block & block)
{
std::unique_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());
if (empty())
throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR);
@ -514,33 +554,9 @@ bool Join::insertFromBlock(const Block & block)
blocks.push_back(block);
Block * stored_block = &blocks.back();
if (isRightOrFull(kind))
{
/** Move the key columns to the beginning of the block.
* This is where NonJoinedBlockInputStream will expect.
*/
size_t key_num = 0;
for (const auto & name : key_names_right)
{
size_t pos = stored_block->getPositionByName(name);
ColumnWithTypeAndName col = stored_block->safeGetByPosition(pos);
stored_block->erase(pos);
stored_block->insert(key_num, std::move(col));
++key_num;
}
}
else
{
NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section
prepareBlockListStructure(*stored_block);
/// Remove the key columns from stored_block, as they are not needed.
for (const auto & name : key_names_right)
{
if (!erased.count(name))
stored_block->erase(stored_block->getPositionByName(name));
erased.insert(name);
}
}
LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure());
size_t size = stored_block->columns();
@ -579,7 +595,9 @@ public:
AddedColumns(const Block & sample_block_with_columns_to_add,
const Block & block_with_columns_to_add,
const Block & block, size_t num_columns_to_skip)
const Block & block,
const Block & blocklist_sample,
const ColumnsWithTypeAndName & extras)
{
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
@ -593,8 +611,14 @@ public:
/// Don't insert column if it's in left block or not explicitly required.
if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name))
addColumn(src_column, num_columns_to_skip + i);
addColumn(src_column);
}
for (auto & extra : extras)
addColumn(extra);
for (auto & tn : type_name)
right_indexes.push_back(blocklist_sample.getPositionByName(tn.second));
}
size_t size() const { return columns.size(); }
@ -622,12 +646,11 @@ private:
MutableColumns columns;
std::vector<size_t> right_indexes;
void addColumn(const ColumnWithTypeAndName & src_column, size_t idx)
void addColumn(const ColumnWithTypeAndName & src_column)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name);
right_indexes.push_back(idx);
}
};
@ -819,14 +842,12 @@ void Join::joinBlockImpl(
/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
* For FULL/RIGHT JOIN, the saved blocks contain keys;
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
* For ASOF, the last column is used as the ASOF column
*/
size_t num_columns_to_skip = 0;
if constexpr (right_or_full)
num_columns_to_skip = keys_size;
/// Add new columns to the block.
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, num_columns_to_skip);
ColumnsWithTypeAndName extras;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
extras.push_back(sample_block_with_keys.getByName(key_names_right.back()));
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, blocklist_sample, extras);
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
@ -837,7 +858,6 @@ void Join::joinBlockImpl(
block.insert(added.moveColumn(i));
/// Filter & insert missing rows
auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any || STRICTNESS == ASTTableJoin::Strictness::Asof)

View File

@ -377,6 +377,9 @@ private:
/// Block with key columns in the same order they appear in the right-side table.
Block sample_block_with_keys;
/// Block as it would appear in the BlockList
Block blocklist_sample;
Poco::Logger * log;
/// Limits for maximum map size.
@ -393,6 +396,11 @@ private:
void init(Type type_);
/** Take an inserted block and discard everything that does not need to be stored
* Example, remove the keys as they come from the LHS block, but do keep the ASOF timestamps
*/
void prepareBlockListStructure(Block & stored_block);
/// Throw an exception if blocks have different types of key columns.
void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right) const;

View File

@ -0,0 +1,15 @@
1 101 1 0 0 0
1 102 2 2 102 1
1 103 3 2 102 1
1 104 4 4 104 1
1 105 5 4 104 1
1 101 1 0 0 0
1 102 2 2 102 1
1 103 3 2 102 1
1 104 4 4 104 1
1 105 5 4 104 1
1 101 1 0 0 0
1 102 2 2 102 1
1 103 3 2 102 1
1 104 4 4 104 1
1 105 5 4 104 1

View File

@ -0,0 +1,25 @@
USE test;
DROP TABLE IF EXISTS A;
DROP TABLE IF EXISTS B;
CREATE TABLE A(k UInt32, t UInt32, a UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO A(k,t,a) VALUES (1,101,1),(1,102,2),(1,103,3),(1,104,4),(1,105,5);
CREATE TABLE B(k UInt32, t UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4);
SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
DROP TABLE B;
CREATE TABLE B(t UInt32, k UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4);
SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
DROP TABLE B;
CREATE TABLE B(k UInt32, b UInt64, t UInt32) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4);
SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
DROP TABLE B;
DROP TABLE A;

View File

@ -1,29 +1,29 @@
1 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0
1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1
2 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0
2 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0
2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2
3 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:03 3 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:04 4 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:05 5 0 0000-00-00 00:00:00 0
1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1
2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2
1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1
2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2