Merge pull request #4774 from Gladdy/martijn-asof-join

Request for feedback on implementation of ASOF join
This commit is contained in:
Artem Zuikov 2019-03-29 13:19:31 +03:00 committed by GitHub
commit 03cd41fbc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 289 additions and 20 deletions

View File

@ -36,6 +36,11 @@ struct HashMethodOneNumber
vec = key_columns[0]->getRawData().data;
}
HashMethodOneNumber(const IColumn * column)
{
vec = column->getRawData().data;
}
/// Creates context. Method is called once and result context is used in all threads.
using Base::createContext; /// (const HashMethodContext::Settings &) -> HashMethodContextPtr

View File

@ -150,6 +150,25 @@ Join::Type Join::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_siz
return Type::hashed;
}
static const IColumn * extractAsofColumn(const ColumnRawPtrs & key_columns)
{
return key_columns.back();
}
template<typename KeyGetter, ASTTableJoin::Strictness STRICTNESS>
static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes)
{
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
auto key_column_copy = key_columns;
auto key_size_copy = key_sizes;
key_column_copy.pop_back();
key_size_copy.pop_back();
return KeyGetter(key_column_copy, key_size_copy, nullptr);
}
else
return KeyGetter(key_columns, key_sizes, nullptr);
}
template <Join::Type type, typename Value, typename Mapped>
struct KeyGetterForTypeImpl;
@ -249,6 +268,7 @@ size_t Join::getTotalByteCount() const
void Join::setSampleBlock(const Block & block)
{
std::unique_lock lock(rwlock);
LOG_DEBUG(log, "setSampleBlock: " << block.dumpStructure());
if (!empty())
return;
@ -273,8 +293,35 @@ void Join::setSampleBlock(const Block & block)
key_columns[i] = &static_cast<const ColumnNullable &>(*key_columns[i]).getNestedColumn();
}
/// Choose data structure to use for JOIN.
init(chooseMethod(key_columns, key_sizes));
if (strictness == ASTTableJoin::Strictness::Asof)
{
if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner)
throw Exception("ASOF only supports LEFT and INNER as base joins", ErrorCodes::NOT_IMPLEMENTED);
if (key_columns.back()->sizeOfValueIfFixed() != sizeof(ASOFTimeType))
{
std::string msg = "ASOF join column needs to have size ";
msg += std::to_string(sizeof(ASOFTimeType));
throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD);
}
key_columns.pop_back();
if (key_columns.empty())
throw Exception("ASOF join cannot be done without a joining column", ErrorCodes::LOGICAL_ERROR);
/// this is going to set up the appropriate hash table for the direct lookup part of the join
/// However, this does not depend on the size of the asof join key (as that goes into the BST)
/// Therefore, add it back in such that it can be extracted appropriately from the full stored
/// key_columns and key_sizes
init(chooseMethod(key_columns, key_sizes));
key_sizes.push_back(sizeof(ASOFTimeType));
}
else
{
/// Choose data structure to use for JOIN.
init(chooseMethod(key_columns, key_sizes));
}
sample_block_with_columns_to_add = materializeBlock(block);
@ -310,6 +357,33 @@ void Join::setSampleBlock(const Block & block)
convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i));
}
void Join::TSRowRef::insert(Join::ASOFTimeType t, const Block * block, size_t row_num)
{
ts.insert(std::pair(t, RowRef(block, row_num)));
}
std::string Join::TSRowRef::dumpStructure() const
{
std::stringstream ss;
for (auto const& x : ts)
{
ss << "(t=" << x.first << " row_num=" << x.second.row_num << " ptr=" << x.second.block << "),";
}
return ss.str();
}
size_t Join::TSRowRef::size() const
{
return ts.size();
}
std::optional<std::pair<Join::ASOFTimeType, Join::RowRef>> Join::TSRowRef::findAsof(Join::ASOFTimeType t) const
{
auto it = ts.upper_bound(t);
if (it == ts.cbegin())
return {};
return *(--it);
}
namespace
{
@ -358,20 +432,48 @@ namespace
}
};
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Asof, Map, KeyGetter>
{
template<typename AsofGetter>
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, AsofGetter & asof_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
if (emplace_result.isInserted())
{
time_series_map = new (time_series_map) typename Map::mapped_type();
}
auto k = asof_getter.getKey(i, pool);
time_series_map->insert(k, stored_block, i);
// std::cout << "inserted key into time series map=" << k << " result=" << time_series_map->dumpStructure() << std::endl;
}
};
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE insertFromBlockImplTypeCase(
Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
KeyGetter key_getter(key_columns, key_sizes, nullptr);
const IColumn * asof_column [[maybe_unused]] = nullptr;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
asof_column = extractAsofColumn(key_columns);
auto key_getter = createKeyGetter<KeyGetter, STRICTNESS>(key_columns, key_sizes);
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
continue;
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, stored_block, i, pool);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
auto asof_getter = Join::AsofGetterType(asof_column);
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, asof_getter, stored_block, i, pool);
} else
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, stored_block, i, pool);
}
}
@ -409,10 +511,10 @@ namespace
}
}
bool Join::insertFromBlock(const Block & block)
{
std::unique_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());
if (empty())
throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR);
@ -537,6 +639,7 @@ public:
columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num);
}
void appendDefaultRow()
{
for (size_t j = 0; j < right_indexes.size(); ++j)
@ -575,6 +678,20 @@ void addFoundRow(const typename Map::mapped_type & mapped, AddedColumns & added,
}
};
template <typename Map>
bool addFoundRowAsof(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset [[maybe_unused]], Join::ASOFTimeType asof_key)
{
if (auto v = mapped.findAsof(asof_key))
{
std::pair<Join::ASOFTimeType, Join::RowRef> res = *v;
// std::cout << "Adder::addFound" << " to_add" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << " found=" << res.first << std::endl;
added.appendFromBlock(*res.second.block, res.second.row_num);
return true;
}
// std::cout << "Adder::addFound" << " not found in map" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << std::endl;
return false;
}
template <bool _add_missing>
void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]])
{
@ -585,20 +702,28 @@ void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & cur
}
}
/// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <bool _add_missing, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool _has_null_map>
std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
const Map & map, size_t rows, KeyGetter & key_getter,
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter)
{
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
IColumn::Offset current_offset = 0;
Arena pool;
const IColumn * asof_column [[maybe_unused]] = nullptr;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
asof_column = extractAsofColumn(key_columns);
auto key_getter = createKeyGetter<KeyGetter, STRICTNESS>(key_columns, key_sizes);
IColumn::Offset current_offset = 0;
for (size_t i = 0; i < rows; ++i)
{
if (_has_null_map && (*null_map)[i])
@ -611,10 +736,28 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
if (find_result.isFound())
{
filter[i] = 1;
auto & mapped = find_result.getMapped();
mapped.setUsed();
addFoundRow<STRICTNESS, Map>(mapped, added_columns, current_offset);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
Join::AsofGetterType asof_getter(asof_column);
auto asof_key = asof_getter.getKey(i, pool);
bool actually_found = addFoundRowAsof<Map>(mapped, added_columns, current_offset, asof_key);
if (actually_found)
{
filter[i] = 1;
mapped.setUsed();
}
else
addNotFoundRow<_add_missing>(added_columns, current_offset);
}
else
{
filter[i] = 1;
mapped.setUsed();
addFoundRow<STRICTNESS, Map>(mapped, added_columns, current_offset);
}
}
else
addNotFoundRow<_add_missing>(added_columns, current_offset);
@ -635,14 +778,13 @@ IColumn::Filter joinRightColumns(
constexpr bool left_or_full = static_in_v<KIND, ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Full>;
IColumn::Filter filter(rows, 0);
KeyGetter key_getter(key_columns, key_sizes, nullptr);
if (null_map)
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, true>(
map, rows, key_getter, added_columns, null_map, filter);
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
else
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, false>(
map, rows, key_getter, added_columns, null_map, filter);
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
return filter;
}
@ -744,7 +886,7 @@ void Join::joinBlockImpl(
auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any || STRICTNESS == ASTTableJoin::Strictness::Asof)
{
/// Some trash to represent IColumn::Filter as ColumnUInt8 needed for ColumnNullable::applyNullMap()
auto null_map_filter_ptr = ColumnUInt8::create();
@ -979,9 +1121,8 @@ void Join::joinGet(Block & block, const String & column_name) const
void Join::joinBlock(Block & block, const Names & key_names_left, const NamesAndTypesList & columns_added_by_join) const
{
// std::cerr << "joinBlock: " << block.dumpStructure() << "\n";
std::shared_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());
checkTypesOfKeys(block, key_names_left, sample_block_with_keys);
@ -1057,6 +1198,14 @@ struct AdderNonJoined<ASTTableJoin::Strictness::All, Mapped>
}
};
template <typename Mapped>
struct AdderNonJoined<ASTTableJoin::Strictness::Asof, Mapped>
{
static void add(const Mapped & /*mapped*/, size_t & /*rows_added*/, MutableColumns & /*columns_right*/)
{
// If we have a leftover match in the right hand side, not required to join because we are only support asof left/inner
}
};
/// Stream from not joined earlier rows of the right table.
class NonJoinedBlockInputStream : public IBlockInputStream

View File

@ -1,5 +1,6 @@
#pragma once
#include <optional>
#include <shared_mutex>
#include <Parsers/ASTTablesInSelectQuery.h>
@ -150,6 +151,21 @@ public:
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
/// Map for a time series
using ASOFTimeType = UInt32;
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<ASOFTimeType, ASOFTimeType, ASOFTimeType, false>;
struct TSRowRef
{
// TODO use the arena allocator to get memory for this
// This would require ditching std::map because std::allocator is incompatible with the arena allocator
std::map<ASOFTimeType, RowRef> ts;
TSRowRef() {}
void insert(ASOFTimeType t, const Block * block, size_t row_num);
std::optional<std::pair<ASOFTimeType, RowRef>> findAsof(ASOFTimeType t) const;
std::string dumpStructure() const;
size_t size() const;
};
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
@ -181,7 +197,6 @@ public:
bool getUsed() const { return true; }
};
/// Different types of keys for maps.
#define APPLY_FOR_JOIN_VARIANTS(M) \
M(key8) \
@ -282,6 +297,7 @@ public:
using MapsAnyFull = MapsTemplate<WithFlags<true, false, RowRef>>;
using MapsAnyFullOverwrite = MapsTemplate<WithFlags<true, true, RowRef>>;
using MapsAllFull = MapsTemplate<WithFlags<true, false, RowRefList>>;
using MapsAsof = MapsTemplate<WithFlags<false, false, TSRowRef>>;
template <ASTTableJoin::Kind KIND>
struct KindTrait
@ -300,7 +316,7 @@ public:
template <ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness, bool overwrite>
using Map = typename MapGetterImpl<KindTrait<kind>::fill_right, strictness, overwrite>::Map;
static constexpr std::array<ASTTableJoin::Strictness, 2> STRICTNESSES = {ASTTableJoin::Strictness::Any, ASTTableJoin::Strictness::All};
static constexpr std::array<ASTTableJoin::Strictness, 3> STRICTNESSES = {ASTTableJoin::Strictness::Any, ASTTableJoin::Strictness::All, ASTTableJoin::Strictness::Asof};
static constexpr std::array<ASTTableJoin::Kind, 4> KINDS
= {ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Inner, ASTTableJoin::Kind::Full, ASTTableJoin::Kind::Right};
@ -377,7 +393,7 @@ private:
*/
BlocksList blocks;
std::variant<MapsAny, MapsAnyOverwrite, MapsAll, MapsAnyFull, MapsAnyFullOverwrite, MapsAllFull> maps;
std::variant<MapsAny, MapsAnyOverwrite, MapsAll, MapsAnyFull, MapsAnyFullOverwrite, MapsAllFull, MapsAsof> maps;
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
Arena pool;
@ -454,4 +470,10 @@ struct Join::MapGetterImpl<true, ASTTableJoin::Strictness::All, false>
using Map = MapsAllFull;
};
template <bool fill_right>
struct Join::MapGetterImpl<fill_right, ASTTableJoin::Strictness::Asof, false>
{
using Map = MapsAsof;
};
}

View File

@ -145,6 +145,9 @@ void ASTTableJoin::formatImplBeforeTable(const FormatSettings & settings, Format
case Strictness::All:
settings.ostr << "ALL ";
break;
case Strictness::Asof:
settings.ostr << "ASOF ";
break;
}
}

View File

@ -75,7 +75,8 @@ struct ASTTableJoin : public IAST
{
Unspecified,
Any, /// If there are many suitable rows to join, use any from them (also known as unique JOIN).
All /// If there are many suitable rows to join, use all of them and replicate rows of "left" table (usual semantic of JOIN).
All, /// If there are many suitable rows to join, use all of them and replicate rows of "left" table (usual semantic of JOIN).
Asof, /// For the last JOIN column, pick the latest value
};
/// Join method.

View File

@ -1113,6 +1113,7 @@ const char * ParserAlias::restricted_keywords[] =
"INNER",
"FULL",
"CROSS",
"ASOF",
"JOIN",
"GLOBAL",
"ANY",

View File

@ -135,6 +135,8 @@ bool ParserTablesInSelectQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expec
table_join->strictness = ASTTableJoin::Strictness::Any;
else if (ParserKeyword("ALL").ignore(pos))
table_join->strictness = ASTTableJoin::Strictness::All;
else if (ParserKeyword("ASOF").ignore(pos))
table_join->strictness = ASTTableJoin::Strictness::Asof;
else
table_join->strictness = ASTTableJoin::Strictness::Unspecified;

View File

@ -333,6 +333,10 @@ private:
columns[j]->insertFrom(*it->getSecond().block->getByPosition(column_indices[j]).column.get(), it->getSecond().row_num);
++rows_added;
}
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
throw Exception("ASOF join storage is not implemented yet", ErrorCodes::NOT_IMPLEMENTED);
}
else
for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(it->getSecond()); current != nullptr;
current = current->next)

View File

@ -0,0 +1,29 @@
1 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0
1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1
1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1
2 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0
2 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0
2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2
3 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:03 3 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:04 4 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:05 5 0 0000-00-00 00:00:00 0
1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1
1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1
2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2
1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1
1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1
2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2

View File

@ -0,0 +1,22 @@
USE test;
DROP TABLE IF EXISTS A;
DROP TABLE IF EXISTS B;
CREATE TABLE A(k UInt32, t DateTime, a Float64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO A(k,t,a) VALUES (1,1,1),(1,2,2),(1,3,3),(1,4,4),(1,5,5); -- multiple joined values
INSERT INTO A(k,t,a) VALUES (2,1,1),(2,2,2),(2,3,3),(2,4,4),(2,5,5); -- one joined value
INSERT INTO A(k,t,a) VALUES (3,1,1),(3,2,2),(3,3,3),(3,4,4),(3,5,5); -- no joined values
CREATE TABLE B(k UInt32, t DateTime, b Float64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO B(k,t,b) VALUES (1,2,2),(1,4,4);
INSERT INTO B(k,t,b) VALUES (2,3,3);
SELECT A.k, toString(A.t, 'UTC'), A.a, B.b, toString(B.t, 'UTC'), B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
SELECT A.k, toString(A.t, 'UTC'), A.a, B.b, toString(B.t, 'UTC'), B.k FROM A ASOF INNER JOIN B ON A.k == B.k AND A.t == B.t ORDER BY (A.k, A.t);
SELECT A.k, toString(A.t, 'UTC'), A.a, B.b, toString(B.t, 'UTC'), B.k FROM A ASOF JOIN B USING(k,t) ORDER BY (A.k, A.t);
DROP TABLE A;
DROP TABLE B;

View File

@ -0,0 +1,14 @@
1 1970-01-01 00:00:05 1 1.5 2
1 1970-01-01 00:00:06 1 1.51 2
1 1970-01-01 00:00:10 11 11.5 12
1 1970-01-01 00:00:11 11 11.51 12
1 1970-01-01 00:00:15 5 5.5 6
1 1970-01-01 00:00:16 5 5.6 6
1 1970-01-01 00:00:20 7 7.5 8
2 1970-01-01 00:00:05 11 2.5 12
2 1970-01-01 00:00:06 11 2.51 12
2 1970-01-01 00:00:10 21 12.5 22
2 1970-01-01 00:00:11 21 12.51 22
2 1970-01-01 00:00:15 5 6.5 6
2 1970-01-01 00:00:16 5 5.6 6
2 1970-01-01 00:00:20 17 8.5 18

View File

@ -0,0 +1,17 @@
USE test;
DROP TABLE IF EXISTS md;
DROP TABLE IF EXISTS tv;
CREATE TABLE md(key UInt32, t DateTime, bid Float64, ask Float64) ENGINE = MergeTree() ORDER BY (key, t);
INSERT INTO test.md(key,t,bid,ask) VALUES (1,20,7,8),(1,5,1,2),(1,10,11,12),(1,15,5,6);
INSERT INTO test.md(key,t,bid,ask) VALUES (2,20,17,18),(2,5,11,12),(2,10,21,22),(2,15,5,6);
CREATE TABLE tv(key UInt32, t DateTime, tv Float64) ENGINE = MergeTree() ORDER BY (key, t);
INSERT INTO test.tv(key,t,tv) VALUES (1,5,1.5),(1,6,1.51),(1,10,11.5),(1,11,11.51),(1,15,5.5),(1,16,5.6),(1,20,7.5);
INSERT INTO test.tv(key,t,tv) VALUES (2,5,2.5),(2,6,2.51),(2,10,12.5),(2,11,12.51),(2,15,6.5),(2,16,5.6),(2,20,8.5);
SELECT tv.key, toString(tv.t, 'UTC'), md.bid, tv.tv, md.ask FROM tv ASOF LEFT JOIN md USING(key,t) ORDER BY (tv.key, tv.t);
DROP TABLE md;
DROP TABLE tv;