Review fixes

This commit is contained in:
vdimir 2023-01-20 16:30:34 +00:00
parent 60acd5e424
commit e30ab0874b
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
6 changed files with 27 additions and 31 deletions

View File

@ -841,9 +841,6 @@ Block concatenateBlocks(const std::vector<Block> & blocks)
if (blocks.empty())
return {};
if (blocks.size() == 1)
return blocks.front();
size_t num_rows = 0;
for (const auto & block : blocks)
num_rows += block.rows();

View File

@ -59,21 +59,22 @@ namespace
Blocks blocks;
size_t rows_read = 0;
while (true)
do
{
Block block = reader.read();
rows_read += block.rows();
if (!block)
{
eof = true;
if (blocks.size() == 1)
return blocks.front();
return concatenateBlocks(blocks);
}
blocks.push_back(std::move(block));
} while (rows_read < result_block_size);
if (rows_read >= result_block_size)
break;
}
if (blocks.size() == 1)
return blocks.front();
return concatenateBlocks(blocks);
}
@ -121,20 +122,12 @@ class GraceHashJoin::FileBucket : boost::noncopyable
public:
using BucketLock = std::unique_lock<std::mutex>;
struct Stats
{
TemporaryFileStream::Stat left;
TemporaryFileStream::Stat right;
};
explicit FileBucket(size_t bucket_index_, GraceHashJoin & grace_join_)
explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, Poco::Logger * log_)
: idx{bucket_index_}
, left_file{
grace_join_.tmp_data->createStream(grace_join_.left_sample_block)}
, right_file{
grace_join_.tmp_data->createStream(grace_join_.prepareRightBlock(grace_join_.right_sample_block))}
, left_file{left_file_}
, right_file{right_file_}
, state{State::WRITING_BLOCKS}
, log{grace_join_.log}
, log{log_}
{
}
@ -170,8 +163,6 @@ public:
bool empty() const { return is_empty.load(); }
Stats getStat() const { return stats; }
AccumulatedBlockReader startJoining()
{
LOG_TRACE(log, "Joining file bucket {}", idx);
@ -179,8 +170,8 @@ public:
std::unique_lock<std::mutex> left_lock(left_file_mutex);
std::unique_lock<std::mutex> right_lock(right_file_mutex);
stats.left = left_file.finishWriting();
stats.right = right_file.finishWriting();
left_file.finishWriting();
right_file.finishWriting();
state = State::JOINING_BLOCKS;
@ -232,7 +223,6 @@ private:
std::atomic_bool is_empty = true;
std::atomic<State> state;
Stats stats;
Poco::Logger * log;
};
@ -395,7 +385,10 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
void GraceHashJoin::addBucket(Buckets & destination)
{
BucketPtr new_bucket = std::make_shared<FileBucket>(destination.size(), *this);
auto & left_file = tmp_data->createStream(left_sample_block);
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
BucketPtr new_bucket = std::make_shared<FileBucket>(destination.size(), left_file, right_file, log);
destination.emplace_back(std::move(new_bucket));
}
@ -649,7 +642,10 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_blocks.emplace_back(std::move(blocks[bucket_index]));
}
current_block = concatenateBlocks(current_blocks);
if (current_blocks.size() == 1)
current_block = std::move(current_blocks.front());
else
current_block = concatenateBlocks(current_blocks);
}
hash_join = makeInMemoryJoin();

View File

@ -95,7 +95,7 @@ private:
/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block);
/// Check that join satisifes limits on rows/bytes in table_join.
/// Check that join satisfies limits on rows/bytes in table_join.
bool hasMemoryOverflow(size_t total_rows, size_t total_bytes) const;
bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const;
bool hasMemoryOverflow(const BlocksList & blocks) const;

View File

@ -1774,10 +1774,10 @@ HashJoin::~HashJoin()
{
if (!data)
{
LOG_DEBUG(log, "Join data has been released");
LOG_TRACE(log, "Join data has been released");
return;
}
LOG_DEBUG(log, "Join data is being destroyed, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount());
LOG_TRACE(log, "Join data is being destroyed, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount());
}
template <typename Mapped>
@ -2058,7 +2058,7 @@ void HashJoin::reuseJoinedData(const HashJoin & join)
BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
{
LOG_DEBUG(log, "Join data is being released, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount());
LOG_TRACE(log, "Join data is being released, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount());
BlocksList right_blocks = std::move(data->blocks);
if (!restructure)

View File

@ -21,6 +21,7 @@ using UInt8ColumnDataPtr = const ColumnUInt8::Container *;
namespace JoinCommon
{
/// Helper interface to work with mask from JOIN ON section
class JoinMask
{
public:

View File

@ -30,6 +30,8 @@ SELECT t1.key, t1.key2 FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key ==
SELECT '--';
SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2;
SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 0; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 1; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT '--';
SELECT '333' = t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND t2.id > 2;