2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/StorageJoin.h>
|
2017-12-30 00:36:06 +00:00
|
|
|
#include <Storages/StorageFactory.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Interpreters/Join.h>
|
2018-11-30 14:49:35 +00:00
|
|
|
#include <Parsers/ASTCreateQuery.h>
|
2019-03-23 22:45:28 +00:00
|
|
|
#include <Parsers/ASTSetQuery.h>
|
2017-12-30 00:36:06 +00:00
|
|
|
#include <Parsers/ASTIdentifier.h>
|
2018-11-30 14:49:35 +00:00
|
|
|
#include <Core/ColumnNumbers.h>
|
2019-01-23 14:48:50 +00:00
|
|
|
#include <DataStreams/IBlockInputStream.h>
|
2018-11-30 14:49:35 +00:00
|
|
|
#include <DataTypes/NestedUtils.h>
|
2019-07-02 14:38:31 +00:00
|
|
|
#include <Interpreters/joinDispatch.h>
|
2019-09-09 19:43:37 +00:00
|
|
|
#include <Interpreters/AnalyzedJoin.h>
|
2019-08-21 02:28:04 +00:00
|
|
|
#include <Common/assert_cast.h>
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2018-03-11 00:15:26 +00:00
|
|
|
#include <Poco/String.h> /// toLower
|
2018-06-09 16:09:37 +00:00
|
|
|
#include <Poco/File.h>
|
2015-01-27 21:24:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2019-08-05 14:03:14 +00:00
|
|
|
extern const int UNSUPPORTED_JOIN_KEYS;
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
|
|
|
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
|
2017-12-30 00:36:06 +00:00
|
|
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
|
|
|
extern const int BAD_ARGUMENTS;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2015-01-27 21:24:24 +00:00
|
|
|
StorageJoin::StorageJoin(
|
2019-10-25 19:07:47 +00:00
|
|
|
const String & relative_path_,
|
2019-07-09 15:40:21 +00:00
|
|
|
const String & database_name_,
|
|
|
|
const String & table_name_,
|
2017-04-01 07:20:54 +00:00
|
|
|
const Names & key_names_,
|
2018-11-30 14:49:35 +00:00
|
|
|
bool use_nulls_,
|
|
|
|
SizeLimits limits_,
|
|
|
|
ASTTableJoin::Kind kind_,
|
|
|
|
ASTTableJoin::Strictness strictness_,
|
2018-12-30 15:54:45 +00:00
|
|
|
const ColumnsDescription & columns_,
|
2019-08-24 21:20:20 +00:00
|
|
|
const ConstraintsDescription & constraints_,
|
2019-10-25 19:07:47 +00:00
|
|
|
bool overwrite,
|
|
|
|
const Context & context_)
|
|
|
|
: StorageSetOrJoinBase{relative_path_, database_name_, table_name_, columns_, constraints_, context_}
|
2018-11-30 14:49:35 +00:00
|
|
|
, key_names(key_names_)
|
|
|
|
, use_nulls(use_nulls_)
|
|
|
|
, limits(limits_)
|
|
|
|
, kind(kind_)
|
|
|
|
, strictness(strictness_)
|
2015-01-27 21:24:24 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
for (const auto & key : key_names)
|
2018-03-13 15:00:28 +00:00
|
|
|
if (!getColumns().hasPhysical(key))
|
2018-05-07 02:01:11 +00:00
|
|
|
throw Exception{"Key column (" + key + ") does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-09-09 19:43:37 +00:00
|
|
|
table_join = std::make_shared<AnalyzedJoin>(limits, use_nulls, kind, strictness, key_names);
|
2019-09-16 12:37:46 +00:00
|
|
|
join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns(), overwrite);
|
2017-04-01 07:20:54 +00:00
|
|
|
restore();
|
2015-01-27 21:24:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-08-27 20:43:08 +00:00
|
|
|
void StorageJoin::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
|
2018-06-09 16:09:37 +00:00
|
|
|
{
|
|
|
|
Poco::File(path).remove(true);
|
|
|
|
Poco::File(path).createDirectories();
|
2018-06-09 18:17:27 +00:00
|
|
|
Poco::File(path + "tmp/").createDirectories();
|
2018-06-09 16:09:37 +00:00
|
|
|
|
|
|
|
increment = 0;
|
2019-09-16 12:37:46 +00:00
|
|
|
join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns());
|
2018-08-10 04:02:56 +00:00
|
|
|
}
|
2018-06-09 16:09:37 +00:00
|
|
|
|
|
|
|
|
2019-12-19 15:50:28 +00:00
|
|
|
HashJoinPtr StorageJoin::getJoin(std::shared_ptr<AnalyzedJoin> analyzed_join) const
|
2015-01-28 02:37:05 +00:00
|
|
|
{
|
2019-12-25 17:32:55 +00:00
|
|
|
if (kind != analyzed_join->kind() || strictness != analyzed_join->strictness())
|
2019-12-30 20:24:02 +00:00
|
|
|
throw Exception("Table " + backQuote(table_name) + " has incompatible type of JOIN.", ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
|
2019-12-19 15:50:28 +00:00
|
|
|
|
2019-12-30 20:08:03 +00:00
|
|
|
if ((analyzed_join->forceNullableRight() && !use_nulls) ||
|
|
|
|
(!analyzed_join->forceNullableRight() && isLeftOrFull(analyzed_join->kind()) && use_nulls))
|
2019-12-30 20:24:02 +00:00
|
|
|
throw Exception("Table " + backQuote(table_name) + " needs the same join_use_nulls setting as present in LEFT or FULL JOIN.",
|
2019-12-30 19:43:40 +00:00
|
|
|
ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
|
|
|
|
|
2019-12-19 15:50:28 +00:00
|
|
|
/// TODO: check key columns
|
|
|
|
|
2019-12-19 20:37:10 +00:00
|
|
|
/// Some HACK to remove wrong names qualifiers: table.column -> column.
|
|
|
|
analyzed_join->setRightKeys(key_names);
|
|
|
|
|
2019-12-19 15:50:28 +00:00
|
|
|
HashJoinPtr join_clone = std::make_shared<Join>(analyzed_join, getSampleBlock().sortColumns());
|
|
|
|
join_clone->reuseJoinedData(*join);
|
|
|
|
return join_clone;
|
2015-01-28 02:37:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-09-09 19:43:37 +00:00
|
|
|
void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block); }
|
2018-08-10 04:02:56 +00:00
|
|
|
size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }
|
2017-01-14 09:00:19 +00:00
|
|
|
|
2017-12-30 00:36:06 +00:00
|
|
|
|
|
|
|
void registerStorageJoin(StorageFactory & factory)
|
|
|
|
{
|
|
|
|
factory.registerStorage("Join", [](const StorageFactory::Arguments & args)
|
|
|
|
{
|
|
|
|
/// Join(ANY, LEFT, k1, k2, ...)
|
|
|
|
|
|
|
|
ASTs & engine_args = args.engine_args;
|
|
|
|
|
2018-11-30 14:49:35 +00:00
|
|
|
auto & settings = args.context.getSettingsRef();
|
2019-12-25 17:32:55 +00:00
|
|
|
|
2018-11-30 14:49:35 +00:00
|
|
|
auto join_use_nulls = settings.join_use_nulls;
|
|
|
|
auto max_rows_in_join = settings.max_rows_in_join;
|
|
|
|
auto max_bytes_in_join = settings.max_bytes_in_join;
|
|
|
|
auto join_overflow_mode = settings.join_overflow_mode;
|
2019-01-24 17:12:05 +00:00
|
|
|
auto join_any_take_last_row = settings.join_any_take_last_row;
|
2019-12-25 17:32:55 +00:00
|
|
|
auto old_any_join = settings.any_join_distinct_right_table_keys;
|
2018-11-30 14:49:35 +00:00
|
|
|
|
|
|
|
if (args.storage_def && args.storage_def->settings)
|
|
|
|
{
|
2019-04-18 23:29:32 +00:00
|
|
|
for (const auto & setting : args.storage_def->settings->changes)
|
2018-11-30 14:49:35 +00:00
|
|
|
{
|
2018-12-18 01:26:12 +00:00
|
|
|
if (setting.name == "join_use_nulls")
|
|
|
|
join_use_nulls.set(setting.value);
|
|
|
|
else if (setting.name == "max_rows_in_join")
|
|
|
|
max_rows_in_join.set(setting.value);
|
|
|
|
else if (setting.name == "max_bytes_in_join")
|
|
|
|
max_bytes_in_join.set(setting.value);
|
|
|
|
else if (setting.name == "join_overflow_mode")
|
|
|
|
join_overflow_mode.set(setting.value);
|
2019-01-24 17:12:05 +00:00
|
|
|
else if (setting.name == "join_any_take_last_row")
|
|
|
|
join_any_take_last_row.set(setting.value);
|
2019-12-25 17:32:55 +00:00
|
|
|
else if (setting.name == "any_join_distinct_right_table_keys")
|
|
|
|
old_any_join.set(setting.value);
|
2018-11-30 14:49:35 +00:00
|
|
|
else
|
|
|
|
throw Exception(
|
|
|
|
"Unknown setting " + setting.name + " for storage " + args.engine_name,
|
|
|
|
ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-12-25 17:32:55 +00:00
|
|
|
if (engine_args.size() < 3)
|
|
|
|
throw Exception(
|
|
|
|
"Storage Join requires at least 3 parameters: Join(ANY|ALL|SEMI|ANTI, LEFT|INNER|RIGHT, keys...).",
|
|
|
|
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
|
|
|
|
|
|
|
ASTTableJoin::Strictness strictness = ASTTableJoin::Strictness::Unspecified;
|
|
|
|
ASTTableJoin::Kind kind = ASTTableJoin::Kind::Comma;
|
|
|
|
|
|
|
|
if (auto opt_strictness_id = tryGetIdentifierName(engine_args[0]))
|
|
|
|
{
|
|
|
|
const String strictness_str = Poco::toLower(*opt_strictness_id);
|
|
|
|
|
2019-12-27 12:41:55 +00:00
|
|
|
if (strictness_str == "any")
|
2019-12-25 17:32:55 +00:00
|
|
|
{
|
|
|
|
if (old_any_join)
|
|
|
|
strictness = ASTTableJoin::Strictness::RightAny;
|
|
|
|
else
|
|
|
|
strictness = ASTTableJoin::Strictness::Any;
|
|
|
|
}
|
2019-12-27 12:41:55 +00:00
|
|
|
else if (strictness_str == "all")
|
2019-12-25 17:32:55 +00:00
|
|
|
strictness = ASTTableJoin::Strictness::All;
|
2019-12-27 12:41:55 +00:00
|
|
|
else if (strictness_str == "semi")
|
2019-12-25 17:32:55 +00:00
|
|
|
strictness = ASTTableJoin::Strictness::Semi;
|
2019-12-27 12:41:55 +00:00
|
|
|
else if (strictness_str == "anti")
|
2019-12-25 17:32:55 +00:00
|
|
|
strictness = ASTTableJoin::Strictness::Anti;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (strictness == ASTTableJoin::Strictness::Unspecified)
|
2019-12-27 12:41:55 +00:00
|
|
|
throw Exception("First parameter of storage Join must be ANY or ALL or SEMI or ANTI (without quotes).",
|
|
|
|
ErrorCodes::BAD_ARGUMENTS);
|
2019-12-25 17:32:55 +00:00
|
|
|
|
|
|
|
if (auto opt_kind_id = tryGetIdentifierName(engine_args[1]))
|
|
|
|
{
|
|
|
|
const String kind_str = Poco::toLower(*opt_kind_id);
|
|
|
|
|
2019-12-27 12:41:55 +00:00
|
|
|
if (kind_str == "left")
|
2019-12-25 17:32:55 +00:00
|
|
|
kind = ASTTableJoin::Kind::Left;
|
2019-12-27 12:41:55 +00:00
|
|
|
else if (kind_str == "inner")
|
2019-12-25 17:32:55 +00:00
|
|
|
kind = ASTTableJoin::Kind::Inner;
|
2019-12-27 12:41:55 +00:00
|
|
|
else if (kind_str == "right")
|
2019-12-25 17:32:55 +00:00
|
|
|
kind = ASTTableJoin::Kind::Right;
|
2019-12-27 12:41:55 +00:00
|
|
|
else if (kind_str == "full")
|
2019-12-25 17:32:55 +00:00
|
|
|
{
|
|
|
|
if (strictness == ASTTableJoin::Strictness::Any)
|
|
|
|
strictness = ASTTableJoin::Strictness::RightAny;
|
|
|
|
kind = ASTTableJoin::Kind::Full;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (kind == ASTTableJoin::Kind::Comma)
|
2019-12-27 12:41:55 +00:00
|
|
|
throw Exception("Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL (without quotes).",
|
|
|
|
ErrorCodes::BAD_ARGUMENTS);
|
2019-12-25 17:32:55 +00:00
|
|
|
|
|
|
|
Names key_names;
|
|
|
|
key_names.reserve(engine_args.size() - 2);
|
|
|
|
for (size_t i = 2, size = engine_args.size(); i < size; ++i)
|
|
|
|
{
|
|
|
|
auto opt_key = tryGetIdentifierName(engine_args[i]);
|
|
|
|
if (!opt_key)
|
|
|
|
throw Exception("Parameter №" + toString(i + 1) + " of storage Join don't look like column name.", ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
|
|
|
|
key_names.push_back(*opt_key);
|
|
|
|
}
|
|
|
|
|
2017-12-30 00:36:06 +00:00
|
|
|
return StorageJoin::create(
|
2019-10-25 19:07:47 +00:00
|
|
|
args.relative_data_path,
|
2019-07-09 15:40:21 +00:00
|
|
|
args.database_name,
|
2018-11-30 14:49:35 +00:00
|
|
|
args.table_name,
|
|
|
|
key_names,
|
2019-08-09 13:02:01 +00:00
|
|
|
join_use_nulls,
|
|
|
|
SizeLimits{max_rows_in_join, max_bytes_in_join, join_overflow_mode},
|
2018-11-30 14:49:35 +00:00
|
|
|
kind,
|
|
|
|
strictness,
|
2018-12-30 15:54:45 +00:00
|
|
|
args.columns,
|
2019-08-24 21:20:20 +00:00
|
|
|
args.constraints,
|
2019-10-25 19:07:47 +00:00
|
|
|
join_any_take_last_row,
|
|
|
|
args.context);
|
2017-12-30 00:36:06 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2018-11-30 14:49:35 +00:00
|
|
|
template <typename T>
|
|
|
|
static const char * rawData(T & t)
|
|
|
|
{
|
|
|
|
return reinterpret_cast<const char *>(&t);
|
|
|
|
}
|
|
|
|
template <typename T>
|
|
|
|
static size_t rawSize(T &)
|
|
|
|
{
|
|
|
|
return sizeof(T);
|
|
|
|
}
|
|
|
|
template <>
|
|
|
|
const char * rawData(const StringRef & t)
|
|
|
|
{
|
|
|
|
return t.data;
|
|
|
|
}
|
|
|
|
template <>
|
|
|
|
size_t rawSize(const StringRef & t)
|
|
|
|
{
|
|
|
|
return t.size;
|
|
|
|
}
|
|
|
|
|
2019-01-23 14:48:50 +00:00
|
|
|
class JoinBlockInputStream : public IBlockInputStream
|
2018-11-30 14:49:35 +00:00
|
|
|
{
|
|
|
|
public:
|
2019-02-10 16:55:12 +00:00
|
|
|
JoinBlockInputStream(const Join & parent_, UInt64 max_block_size_, Block && sample_block_)
|
2019-12-19 15:50:28 +00:00
|
|
|
: parent(parent_), lock(parent.data->rwlock), max_block_size(max_block_size_), sample_block(std::move(sample_block_))
|
2018-11-30 14:49:35 +00:00
|
|
|
{
|
|
|
|
columns.resize(sample_block.columns());
|
|
|
|
column_indices.resize(sample_block.columns());
|
|
|
|
column_with_null.resize(sample_block.columns());
|
|
|
|
for (size_t i = 0; i < sample_block.columns(); ++i)
|
|
|
|
{
|
|
|
|
auto & [_, type, name] = sample_block.getByPosition(i);
|
2019-09-11 15:57:09 +00:00
|
|
|
if (parent.right_table_keys.has(name))
|
2018-11-30 14:49:35 +00:00
|
|
|
{
|
|
|
|
key_pos = i;
|
2019-09-11 15:57:09 +00:00
|
|
|
column_with_null[i] = parent.right_table_keys.getByName(name).type->isNullable();
|
2018-11-30 14:49:35 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auto pos = parent.sample_block_with_columns_to_add.getPositionByName(name);
|
|
|
|
column_indices[i] = pos;
|
|
|
|
column_with_null[i] = !parent.sample_block_with_columns_to_add.getByPosition(pos).type->equals(*type);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
String getName() const override { return "Join"; }
|
|
|
|
|
|
|
|
Block getHeader() const override { return sample_block; }
|
|
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
Block readImpl() override
|
|
|
|
{
|
2019-12-19 15:50:28 +00:00
|
|
|
if (parent.data->blocks.empty())
|
2018-11-30 14:49:35 +00:00
|
|
|
return Block();
|
|
|
|
|
2018-12-30 15:54:45 +00:00
|
|
|
Block block;
|
2019-12-19 15:50:28 +00:00
|
|
|
if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps,
|
2019-12-25 17:32:55 +00:00
|
|
|
[&](auto kind, auto strictness, auto & map) { block = createBlock<kind, strictness>(map); }))
|
|
|
|
throw Exception("Logical error: unknown JOIN strictness", ErrorCodes::LOGICAL_ERROR);
|
2018-12-30 15:54:45 +00:00
|
|
|
return block;
|
2018-11-30 14:49:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
const Join & parent;
|
|
|
|
std::shared_lock<std::shared_mutex> lock;
|
2019-02-10 16:55:12 +00:00
|
|
|
UInt64 max_block_size;
|
2018-11-30 14:49:35 +00:00
|
|
|
Block sample_block;
|
|
|
|
|
|
|
|
ColumnNumbers column_indices;
|
|
|
|
std::vector<bool> column_with_null;
|
|
|
|
std::optional<size_t> key_pos;
|
|
|
|
MutableColumns columns;
|
|
|
|
|
|
|
|
std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure
|
|
|
|
|
|
|
|
|
2019-12-25 17:32:55 +00:00
|
|
|
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
|
2018-11-30 14:49:35 +00:00
|
|
|
Block createBlock(const Maps & maps)
|
|
|
|
{
|
|
|
|
for (size_t i = 0; i < sample_block.columns(); ++i)
|
|
|
|
{
|
|
|
|
const auto & src_col = sample_block.safeGetByPosition(i);
|
|
|
|
columns[i] = src_col.type->createColumn();
|
|
|
|
if (column_with_null[i])
|
|
|
|
{
|
|
|
|
if (key_pos == i)
|
|
|
|
{
|
|
|
|
// unwrap null key column
|
2019-08-21 02:28:04 +00:00
|
|
|
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*columns[i]);
|
2018-11-30 14:49:35 +00:00
|
|
|
columns[i] = nullable_col.getNestedColumnPtr()->assumeMutable();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
// wrap non key column with null
|
|
|
|
columns[i] = makeNullable(std::move(columns[i]))->assumeMutable();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t rows_added = 0;
|
|
|
|
|
2019-12-19 15:50:28 +00:00
|
|
|
switch (parent.data->type)
|
2018-11-30 14:49:35 +00:00
|
|
|
{
|
|
|
|
#define M(TYPE) \
|
|
|
|
case Join::Type::TYPE: \
|
2019-12-25 17:32:55 +00:00
|
|
|
rows_added = fillColumns<KIND, STRICTNESS>(*maps.TYPE); \
|
2018-11-30 14:49:35 +00:00
|
|
|
break;
|
|
|
|
APPLY_FOR_JOIN_VARIANTS_LIMITED(M)
|
|
|
|
#undef M
|
|
|
|
|
|
|
|
default:
|
2019-12-19 15:50:28 +00:00
|
|
|
throw Exception("Unsupported JOIN keys in StorageJoin. Type: " + toString(static_cast<UInt32>(parent.data->type)),
|
2019-08-05 14:03:14 +00:00
|
|
|
ErrorCodes::UNSUPPORTED_JOIN_KEYS);
|
2018-11-30 14:49:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (!rows_added)
|
|
|
|
return {};
|
|
|
|
|
|
|
|
Block res = sample_block.cloneEmpty();
|
|
|
|
for (size_t i = 0; i < columns.size(); ++i)
|
|
|
|
if (column_with_null[i])
|
|
|
|
{
|
|
|
|
if (key_pos == i)
|
2019-03-25 01:43:54 +00:00
|
|
|
res.getByPosition(i).column = makeNullable(std::move(columns[i]));
|
2018-11-30 14:49:35 +00:00
|
|
|
else
|
|
|
|
{
|
2019-08-21 02:28:04 +00:00
|
|
|
const ColumnNullable & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]);
|
2018-11-30 14:49:35 +00:00
|
|
|
res.getByPosition(i).column = nullable_col.getNestedColumnPtr();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
res.getByPosition(i).column = std::move(columns[i]);
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2019-12-25 17:32:55 +00:00
|
|
|
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map>
|
2018-11-30 14:49:35 +00:00
|
|
|
size_t fillColumns(const Map & map)
|
|
|
|
{
|
|
|
|
size_t rows_added = 0;
|
|
|
|
|
|
|
|
if (!position)
|
|
|
|
position = decltype(position)(
|
|
|
|
static_cast<void *>(new typename Map::const_iterator(map.begin())),
|
|
|
|
[](void * ptr) { delete reinterpret_cast<typename Map::const_iterator *>(ptr); });
|
|
|
|
|
|
|
|
auto & it = *reinterpret_cast<typename Map::const_iterator *>(position.get());
|
|
|
|
auto end = map.end();
|
|
|
|
|
|
|
|
for (; it != end; ++it)
|
|
|
|
{
|
2019-11-07 21:32:44 +00:00
|
|
|
if constexpr (STRICTNESS == ASTTableJoin::Strictness::RightAny)
|
2018-11-30 14:49:35 +00:00
|
|
|
{
|
2019-12-25 17:32:55 +00:00
|
|
|
fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
|
|
|
|
}
|
|
|
|
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
|
|
|
|
{
|
|
|
|
fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
|
2018-11-30 14:49:35 +00:00
|
|
|
}
|
2019-11-07 21:32:44 +00:00
|
|
|
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
|
|
|
|
{
|
2019-12-25 17:32:55 +00:00
|
|
|
if constexpr (KIND == ASTTableJoin::Kind::Left || KIND == ASTTableJoin::Kind::Inner)
|
|
|
|
fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
|
|
|
|
else if constexpr (KIND == ASTTableJoin::Kind::Right)
|
|
|
|
fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
|
|
|
|
}
|
|
|
|
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Semi)
|
|
|
|
{
|
|
|
|
if constexpr (KIND == ASTTableJoin::Kind::Left)
|
|
|
|
fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
|
|
|
|
else if constexpr (KIND == ASTTableJoin::Kind::Right)
|
|
|
|
fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
|
2019-11-07 21:32:44 +00:00
|
|
|
}
|
2019-12-25 17:32:55 +00:00
|
|
|
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Anti)
|
2019-03-26 22:05:51 +00:00
|
|
|
{
|
2019-12-25 17:32:55 +00:00
|
|
|
if constexpr (KIND == ASTTableJoin::Kind::Left)
|
|
|
|
fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
|
|
|
|
else if constexpr (KIND == ASTTableJoin::Kind::Right)
|
|
|
|
fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
|
2019-03-20 16:58:28 +00:00
|
|
|
}
|
2018-11-30 14:49:35 +00:00
|
|
|
else
|
2019-12-25 17:32:55 +00:00
|
|
|
throw Exception("This JOIN is not implemented yet", ErrorCodes::NOT_IMPLEMENTED);
|
2018-11-30 14:49:35 +00:00
|
|
|
|
|
|
|
if (rows_added >= max_block_size)
|
|
|
|
{
|
|
|
|
++it;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return rows_added;
|
|
|
|
}
|
2019-12-25 17:32:55 +00:00
|
|
|
|
|
|
|
template <typename Map>
|
|
|
|
static void fillOne(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
|
|
|
|
const std::optional<size_t> & key_pos, size_t & rows_added)
|
|
|
|
{
|
|
|
|
for (size_t j = 0; j < columns.size(); ++j)
|
|
|
|
if (j == key_pos)
|
|
|
|
columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
|
|
|
|
else
|
|
|
|
columns[j]->insertFrom(*it->getMapped().block->getByPosition(column_indices[j]).column.get(), it->getMapped().row_num);
|
|
|
|
++rows_added;
|
|
|
|
}
|
|
|
|
|
|
|
|
template <typename Map>
|
|
|
|
static void fillAll(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
|
|
|
|
const std::optional<size_t> & key_pos, size_t & rows_added)
|
|
|
|
{
|
|
|
|
for (auto ref_it = it->getMapped().begin(); ref_it.ok(); ++ref_it)
|
|
|
|
{
|
|
|
|
for (size_t j = 0; j < columns.size(); ++j)
|
|
|
|
if (j == key_pos)
|
|
|
|
columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
|
|
|
|
else
|
|
|
|
columns[j]->insertFrom(*ref_it->block->getByPosition(column_indices[j]).column.get(), ref_it->row_num);
|
|
|
|
++rows_added;
|
|
|
|
}
|
|
|
|
}
|
2018-11-30 14:49:35 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: multiple stream read and index read
|
|
|
|
BlockInputStreams StorageJoin::read(
|
|
|
|
const Names & column_names,
|
|
|
|
const SelectQueryInfo & /*query_info*/,
|
|
|
|
const Context & /*context*/,
|
|
|
|
QueryProcessingStage::Enum /*processed_stage*/,
|
2019-02-18 23:38:44 +00:00
|
|
|
size_t max_block_size,
|
2018-11-30 14:49:35 +00:00
|
|
|
unsigned /*num_streams*/)
|
|
|
|
{
|
|
|
|
check(column_names);
|
2019-01-04 12:10:00 +00:00
|
|
|
return {std::make_shared<JoinBlockInputStream>(*join, max_block_size, getSampleBlockForColumns(column_names))};
|
2018-11-30 14:49:35 +00:00
|
|
|
}
|
|
|
|
|
2015-01-27 21:24:24 +00:00
|
|
|
}
|