This commit is contained in:
Evgeniy Gatov 2015-12-11 12:52:35 +03:00
commit 1fc378b003
6 changed files with 131 additions and 9 deletions

View File

@ -181,7 +181,7 @@ private:
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnUInt32 *>(column)->insert(value.numberInt());
static_cast<ColumnUInt32 *>(column)->insert(value.numberLong());
break;
}
case value_type_t::UInt64:

View File

@ -60,7 +60,7 @@ void filterArraysImpl(
if (result_size_hint < 0)
res_elems.reserve(src_elems.size());
else if (result_size_hint < 1000000000 && src_elems.size() < 1000000000) /// Избегаем переполнения.
res_elems.reserve(result_size_hint * src_elems.size() / size);
res_elems.reserve((result_size_hint * src_elems.size() + size - 1) / size);
}
IColumn::Offset_t current_src_offset = 0;
@ -86,7 +86,7 @@ void filterArraysImpl(
res_offsets.push_back(current_src_offset);
const auto elems_size_old = res_elems.size();
res_elems.resize_assume_reserved(elems_size_old + size);
res_elems.resize(elems_size_old + size);
memcpy(&res_elems[elems_size_old], &src_elems[offset], size * sizeof(T));
};

View File

@ -875,9 +875,6 @@ void Aggregator::writeToTemporaryFileImpl(
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
if (method.data.impls[bucket].empty())
continue;
Block block = convertOneBucketToBlock(data_variants, method, false, bucket);
out.write(block);
@ -977,6 +974,9 @@ void Aggregator::convertToBlockImpl(
const Sizes & key_sizes,
bool final) const
{
if (data.empty())
return;
if (final)
convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns, key_sizes);
else
@ -2116,7 +2116,8 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << blocks.front().info.bucket_num << ").");
auto bucket_num = blocks.front().info.bucket_num;
LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << bucket_num << ").");
for (Block & block : blocks)
{
@ -2188,7 +2189,9 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
if (merged_blocks.empty())
return {};
return merged_blocks.front();
auto res = std::move(merged_blocks.front());
res.info.bucket_num = bucket_num;
return res;
}

View File

@ -329,7 +329,7 @@ void StorageBuffer::flushAllBuffers(const bool check_thresholds)
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
{
Block block_to_write = buffer.data.cloneEmpty();
Block block_to_write;
time_t current_time = time(0);
size_t rows = 0;
@ -345,6 +345,8 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
{
std::lock_guard<std::mutex> lock(buffer.mutex);
block_to_write = buffer.data.cloneEmpty();
rows = buffer.data.rowsInFirstColumn();
bytes = buffer.data.bytes();
if (buffer.first_write_time)

View File

@ -0,0 +1,100 @@
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....
0
1.
10
100
1000
1001.
1002..
1003...
1004....
1005.....

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS test.numbers_10;
SET max_block_size = 1000;
CREATE TABLE test.numbers_10 ENGINE = Log AS SELECT * FROM system.numbers LIMIT 10000;
SET distributed_aggregation_memory_efficient = 1, group_by_two_level_threshold = 5000;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
SELECT concat(toString(number), arrayStringConcat(arrayMap(x -> '.', range(number % 10)))) AS k FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 4999 : 10000) GROUP BY k ORDER BY k LIMIT 10;
DROP TABLE test.numbers_10;