2017-04-01 09:19:00 +00:00
|
|
|
#include <Interpreters/Set.h>
|
|
|
|
#include <Interpreters/Join.h>
|
2018-02-19 20:42:05 +00:00
|
|
|
#include <DataStreams/materializeBlock.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <DataStreams/IBlockOutputStream.h>
|
|
|
|
#include <DataStreams/CreatingSetsBlockInputStream.h>
|
|
|
|
#include <Storages/IStorage.h>
|
2014-03-04 11:26:55 +00:00
|
|
|
#include <iomanip>
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
|
2014-03-04 11:26:55 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int SET_SIZE_LIMIT_EXCEEDED;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-04-19 21:34:04 +00:00
|
|
|
CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(
|
|
|
|
const BlockInputStreamPtr & input,
|
|
|
|
const SubqueriesForSets & subqueries_for_sets_,
|
|
|
|
const SizeLimits & network_transfer_limits)
|
|
|
|
: subqueries_for_sets(subqueries_for_sets_),
|
|
|
|
network_transfer_limits(network_transfer_limits)
|
|
|
|
{
|
|
|
|
for (auto & elem : subqueries_for_sets)
|
|
|
|
{
|
|
|
|
if (elem.second.source)
|
|
|
|
{
|
|
|
|
children.push_back(elem.second.source);
|
2018-04-19 21:36:58 +00:00
|
|
|
|
|
|
|
if (elem.second.set)
|
|
|
|
elem.second.set->setHeader(elem.second.source->getHeader());
|
2018-04-19 21:34:04 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
children.push_back(input);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2014-03-04 11:26:55 +00:00
|
|
|
Block CreatingSetsBlockInputStream::readImpl()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
Block res;
|
2014-03-04 11:26:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
createAll();
|
2014-03-04 11:26:55 +00:00
|
|
|
|
2018-03-05 21:09:39 +00:00
|
|
|
if (isCancelledOrThrowIfKilled())
|
2017-04-01 07:20:54 +00:00
|
|
|
return res;
|
2014-03-04 11:26:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return children.back()->read();
|
2014-03-04 11:26:55 +00:00
|
|
|
}
|
|
|
|
|
2015-04-16 09:55:24 +00:00
|
|
|
|
2016-05-31 01:04:34 +00:00
|
|
|
void CreatingSetsBlockInputStream::readPrefixImpl()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
createAll();
|
2016-05-31 01:04:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-02-23 10:43:24 +00:00
|
|
|
Block CreatingSetsBlockInputStream::getTotals()
|
2015-04-16 09:55:24 +00:00
|
|
|
{
|
2019-01-23 14:48:50 +00:00
|
|
|
return children.back()->getTotals();
|
2015-04-16 09:55:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-05-31 01:04:34 +00:00
|
|
|
void CreatingSetsBlockInputStream::createAll()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!created)
|
|
|
|
{
|
|
|
|
for (auto & elem : subqueries_for_sets)
|
|
|
|
{
|
|
|
|
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
|
|
|
|
{
|
2018-03-05 21:09:39 +00:00
|
|
|
if (isCancelledOrThrowIfKilled())
|
2017-04-01 07:20:54 +00:00
|
|
|
return;
|
|
|
|
|
|
|
|
createOne(elem.second);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
created = true;
|
|
|
|
}
|
2016-05-31 01:04:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
|
2014-03-04 11:26:55 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
LOG_TRACE(log, (subquery.set ? "Creating set. " : "")
|
|
|
|
<< (subquery.join ? "Creating join. " : "")
|
|
|
|
<< (subquery.table ? "Filling temporary table. " : ""));
|
|
|
|
Stopwatch watch;
|
|
|
|
|
|
|
|
BlockOutputStreamPtr table_out;
|
|
|
|
if (subquery.table)
|
|
|
|
table_out = subquery.table->write({}, {});
|
|
|
|
|
|
|
|
bool done_with_set = !subquery.set;
|
|
|
|
bool done_with_join = !subquery.join;
|
|
|
|
bool done_with_table = !subquery.table;
|
|
|
|
|
|
|
|
if (done_with_set && done_with_join && done_with_table)
|
|
|
|
throw Exception("Logical error: nothing to do with subquery", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
if (table_out)
|
|
|
|
table_out->writePrefix();
|
|
|
|
|
|
|
|
while (Block block = subquery.source->read())
|
|
|
|
{
|
|
|
|
if (isCancelled())
|
|
|
|
{
|
|
|
|
LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!done_with_set)
|
|
|
|
{
|
2018-07-02 18:57:14 +00:00
|
|
|
if (!subquery.set->insertFromBlock(block))
|
2017-04-01 07:20:54 +00:00
|
|
|
done_with_set = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!done_with_join)
|
|
|
|
{
|
2018-07-31 11:31:18 +00:00
|
|
|
for (const auto & name_with_alias : subquery.joined_block_aliases)
|
|
|
|
{
|
|
|
|
if (block.has(name_with_alias.first))
|
|
|
|
{
|
|
|
|
auto pos = block.getPositionByName(name_with_alias.first);
|
|
|
|
auto column = block.getByPosition(pos);
|
|
|
|
block.erase(pos);
|
|
|
|
column.name = name_with_alias.second;
|
|
|
|
block.insert(std::move(column));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-02 18:14:37 +00:00
|
|
|
if (subquery.joined_block_actions)
|
|
|
|
subquery.joined_block_actions->execute(block);
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!subquery.join->insertFromBlock(block))
|
|
|
|
done_with_join = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!done_with_table)
|
|
|
|
{
|
2018-02-19 20:42:05 +00:00
|
|
|
block = materializeBlock(block);
|
2017-04-01 07:20:54 +00:00
|
|
|
table_out->write(block);
|
|
|
|
|
|
|
|
rows_to_transfer += block.rows();
|
|
|
|
bytes_to_transfer += block.bytes();
|
|
|
|
|
2018-03-11 00:15:26 +00:00
|
|
|
if (!network_transfer_limits.check(rows_to_transfer, bytes_to_transfer, "IN/JOIN external table", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
|
|
|
|
done_with_table = true;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (done_with_set && done_with_join && done_with_table)
|
|
|
|
{
|
2019-01-23 14:48:50 +00:00
|
|
|
subquery.source->cancel(false);
|
2017-04-01 07:20:54 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2014-06-12 04:04:47 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (table_out)
|
|
|
|
table_out->writeSuffix();
|
|
|
|
|
|
|
|
watch.stop();
|
|
|
|
|
|
|
|
size_t head_rows = 0;
|
2019-01-23 14:48:50 +00:00
|
|
|
const BlockStreamProfileInfo & profile_info = subquery.source->getProfileInfo();
|
2018-02-23 10:43:24 +00:00
|
|
|
|
2019-01-23 14:48:50 +00:00
|
|
|
head_rows = profile_info.rows;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-01-23 14:48:50 +00:00
|
|
|
if (subquery.join)
|
|
|
|
subquery.join->setTotals(subquery.source->getTotals());
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-02-23 10:43:24 +00:00
|
|
|
if (head_rows != 0)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
std::stringstream msg;
|
|
|
|
msg << std::fixed << std::setprecision(3);
|
|
|
|
msg << "Created. ";
|
|
|
|
|
|
|
|
if (subquery.set)
|
|
|
|
msg << "Set with " << subquery.set->getTotalRowCount() << " entries from " << head_rows << " rows. ";
|
|
|
|
if (subquery.join)
|
|
|
|
msg << "Join with " << subquery.join->getTotalRowCount() << " entries from " << head_rows << " rows. ";
|
|
|
|
if (subquery.table)
|
|
|
|
msg << "Table with " << head_rows << " rows. ";
|
|
|
|
|
2018-02-23 10:43:24 +00:00
|
|
|
msg << "In " << watch.elapsedSeconds() << " sec.";
|
2017-04-01 07:20:54 +00:00
|
|
|
LOG_DEBUG(log, msg.rdbuf());
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
LOG_DEBUG(log, "Subquery has empty result.");
|
|
|
|
}
|
2014-03-04 11:26:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|