ClickHouse/src/Processors/Transforms/CreatingSetsTransform.cpp

133 lines
3.1 KiB
C++
Raw Normal View History

2019-03-26 14:11:38 +00:00
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/Set.h>
#include <Interpreters/IJoin.h>
2019-03-26 14:11:38 +00:00
#include <Storages/IStorage.h>
#include <iomanip>
#include <DataStreams/materializeBlock.h>
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR;
2019-03-26 14:11:38 +00:00
extern const int SET_SIZE_LIMIT_EXCEEDED;
}
CreatingSetsTransform::CreatingSetsTransform(
2020-09-01 13:53:11 +00:00
Block in_header_,
2019-08-03 11:02:40 +00:00
Block out_header_,
2020-09-01 13:53:11 +00:00
SubqueryForSet subquery_for_set_,
2020-06-17 15:50:21 +00:00
SizeLimits network_transfer_limits_,
ContextPtr context_)
2020-09-01 13:53:11 +00:00
: IAccumulatingTransform(std::move(in_header_), std::move(out_header_))
, WithContext(context_)
2020-09-01 13:53:11 +00:00
, subquery(std::move(subquery_for_set_))
2020-06-17 15:50:21 +00:00
, network_transfer_limits(std::move(network_transfer_limits_))
2019-03-26 14:11:38 +00:00
{
}
2020-09-01 13:53:11 +00:00
void CreatingSetsTransform::work()
{
if (!is_initialized)
init();
2019-03-26 14:11:38 +00:00
2020-09-01 13:53:11 +00:00
IAccumulatingTransform::work();
2019-03-26 14:11:38 +00:00
}
2020-09-01 13:53:11 +00:00
void CreatingSetsTransform::startSubquery()
2019-03-26 14:11:38 +00:00
{
2020-05-23 21:16:05 +00:00
if (subquery.set)
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Creating set.");
2020-05-23 21:16:05 +00:00
if (subquery.table)
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Filling temporary table.");
2019-03-26 14:11:38 +00:00
if (subquery.table)
table_out = subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext());
2019-03-26 14:11:38 +00:00
done_with_set = !subquery.set;
done_with_table = !subquery.table;
2021-04-28 17:32:12 +00:00
if (done_with_set /*&& done_with_join*/ && done_with_table)
2019-03-26 14:11:38 +00:00
throw Exception("Logical error: nothing to do with subquery", ErrorCodes::LOGICAL_ERROR);
if (table_out)
table_out->writePrefix();
}
2020-09-01 13:53:11 +00:00
void CreatingSetsTransform::finishSubquery()
2019-03-26 14:11:38 +00:00
{
2020-09-01 13:53:11 +00:00
if (read_rows != 0)
2019-03-26 14:11:38 +00:00
{
2020-09-01 13:53:11 +00:00
auto seconds = watch.elapsedNanoseconds() / 1e9;
2019-03-26 14:11:38 +00:00
if (subquery.set)
2020-09-01 13:53:11 +00:00
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), read_rows, seconds);
2019-03-26 14:11:38 +00:00
if (subquery.table)
2020-09-01 13:53:11 +00:00
LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds);
2019-03-26 14:11:38 +00:00
}
else
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Subquery has empty result.");
2019-03-26 14:11:38 +00:00
}
}
2019-04-09 11:09:46 +00:00
void CreatingSetsTransform::init()
{
is_initialized = true;
2020-09-01 13:53:11 +00:00
if (subquery.set)
subquery.set->setHeader(getInputPort().getHeader());
watch.restart();
startSubquery();
2019-04-09 11:09:46 +00:00
}
2020-09-01 13:53:11 +00:00
void CreatingSetsTransform::consume(Chunk chunk)
2019-03-26 14:11:38 +00:00
{
2020-09-01 13:53:11 +00:00
read_rows += chunk.getNumRows();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
2019-03-26 14:11:38 +00:00
if (!done_with_set)
{
if (!subquery.set->insertFromBlock(block))
done_with_set = true;
}
if (!done_with_table)
{
block = materializeBlock(block);
table_out->write(block);
rows_to_transfer += block.rows();
bytes_to_transfer += block.bytes();
if (!network_transfer_limits.check(rows_to_transfer, bytes_to_transfer, "IN/JOIN external table",
ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
done_with_table = true;
}
2021-04-29 15:55:20 +00:00
if (done_with_set && done_with_table)
2020-09-01 13:53:11 +00:00
finishConsume();
2019-03-26 14:11:38 +00:00
}
2020-09-01 13:53:11 +00:00
Chunk CreatingSetsTransform::generate()
2019-04-10 12:38:57 +00:00
{
2020-09-01 13:53:11 +00:00
if (subquery.set)
subquery.set->finishInsert();
2019-04-10 12:38:57 +00:00
2020-09-01 13:53:11 +00:00
if (table_out)
table_out->writeSuffix();
finishSubquery();
return {};
2019-04-10 12:38:57 +00:00
}
2019-03-26 14:11:38 +00:00
}