appears to work and get the correct timestamps

This commit is contained in:
Martijn Bakker 2019-03-31 22:14:43 +01:00
parent 4a7393cfc6
commit 7ea03f6fa7

View File

@ -420,6 +420,8 @@ namespace
if (emplace_result.isInserted())
time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType());
std::cout << "inserting rhs block=" << stored_block->dumpStructure() << std::endl;
time_series_map->insert(asof_column, stored_block, i, pool);
}
};
@ -485,7 +487,6 @@ namespace
bool Join::insertFromBlock(const Block & block)
{
std::unique_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());
if (empty())
throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR);
@ -549,6 +550,8 @@ bool Join::insertFromBlock(const Block & block)
}
}
LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure());
size_t size = stored_block->columns();
/// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them.
@ -586,7 +589,7 @@ public:
AddedColumns(const Block & sample_block_with_columns_to_add,
const Block & block_with_columns_to_add,
const Block & block, size_t num_columns_to_skip)
const Block & block)
{
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
@ -599,10 +602,31 @@ public:
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i);
/// Don't insert column if it's in left block or not explicitly required.
if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name))
addColumn(src_column, num_columns_to_skip + i);
if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name)) {
addColumn(src_column);
}
}
}
void addColumn(const ColumnWithTypeAndName & src_column)
{
std::cout << "adding column from src=" << src_column.dumpStructure() << std::endl;
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name);
}
void fillRightIndices(const Block& rhs_block)
{
for(auto& tn : type_name) {
right_indexes.push_back(rhs_block.getPositionByName(tn.second));
}
for(unsigned i = 0; i < right_indexes.size(); i++) {
std::cout << "ri i=" << i << " ri=" << right_indexes[i] << std::endl;
}
loaded = true;
}
size_t size() const { return columns.size(); }
@ -613,39 +637,29 @@ public:
void appendFromBlock(const Block & block, size_t row_num)
{
std::cout << "appendFromBlock block=" << block.dumpStructure() << " row_num=" << row_num << std::endl;
if(!loaded)
return;
std::cout << "appendFromBlock block=" << block.dumpStructure() << " row_num=" << row_num << " right_indexes=[";
for (size_t j = 0; j < right_indexes.size(); ++j) {
std::cout << "right_index=" << right_indexes[j] << std::endl;
for (size_t j = 0; j < columns.size(); ++j) {
std::cout << right_indexes[j] << " ";
columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num);
}
}
std::cout << "]" << std::endl;
}
void appendDefaultRow()
{
for (size_t j = 0; j < right_indexes.size(); ++j)
for (size_t j = 0; j < columns.size(); ++j)
columns[j]->insertDefault();
}
void addExtraColumn(const ColumnWithTypeAndName & src_column)
{
addColumn(src_column, columns.size());
}
private:
void addColumn(const ColumnWithTypeAndName & src_column, size_t idx)
{
std::cout << "adding column from src=" << src_column.dumpStructure() << " idx=" << idx << std::endl;
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name);
right_indexes.push_back(idx);
}
TypeAndNames type_name;
MutableColumns columns;
bool loaded = false;
std::vector<size_t> right_indexes;
};
@ -838,17 +852,22 @@ void Join::joinBlockImpl(
* For FULL/RIGHT JOIN, the saved blocks contain keys;
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
*/
size_t num_columns_to_skip = 0;
if constexpr (right_or_full)
num_columns_to_skip = keys_size;
// size_t num_columns_to_skip = 0;
// if constexpr (right_or_full)
// num_columns_to_skip = keys_size;
/// Add new columns to the block.
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, num_columns_to_skip);
LOG_DEBUG(log, "joinBlockImpl - sample_block_with_columns_to_add" << sample_block_with_columns_to_add.dumpStructure());
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
// Add the last key column which is the ASOF key
added.addExtraColumn(sample_block_with_keys.safeGetByPosition(sample_block_with_keys.columns()-1));
added.addColumn(sample_block_with_keys.safeGetByPosition(sample_block_with_keys.columns()-1));
if(!blocks.empty()) {
added.fillRightIndices(*blocks.begin());
} else {
LOG_DEBUG(log, "unable to fill right index of added columns");
}
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;