This commit is contained in:
Andrey Mironov 2014-09-12 20:05:29 +04:00
parent 13a77ffaa1
commit 09a570ffc0
8 changed files with 118 additions and 156 deletions

View File

@ -3,6 +3,7 @@
#include <vector>
#include <map>
#include <list>
#include <initializer_list>
#include <DB/Core/ColumnWithNameAndType.h>
#include <DB/Core/NamesAndTypes.h>
@ -32,7 +33,16 @@ private:
IndexByName_t index_by_name;
public:
Block() {}
Block() = default;
Block(std::initializer_list<ColumnWithNameAndType> il) : data{il}
{
index_by_position.reserve(il.size());
for (auto it = std::begin(data); it != std::end(data); ++it)
{
index_by_name[it->name] = it;
index_by_position.push_back(it);
}
}
/// нужны, чтобы правильно скопировались индексы
Block(const Block & other);

View File

@ -22,7 +22,7 @@ struct BlockStreamProfileInfo
{
bool started = false;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Время с учётом ожидания
String stream_name; /// Короткое имя потока, для которого собирается информация
size_t rows = 0;
@ -47,14 +47,14 @@ struct BlockStreamProfileInfo
bool hasAppliedLimit() const;
void update(Block & block);
/// Методы для бинарной [де]сериализации
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
private:
void calculateRowsBeforeLimit() const;
/// Для этих полей сделаем accessor'ы, т.к. их необходимо предварительно вычислять.
mutable bool applied_limit = false; /// Применялся ли LIMIT
mutable size_t rows_before_limit = 0;
@ -80,7 +80,7 @@ public:
void readSuffix();
/// Получить информацию о скорости выполнения.
const BlockStreamProfileInfo & getInfo() const;
const BlockStreamProfileInfo & getInfo() const { return info; }
/** Получить "тотальные" значения.
* Реализация по-умолчанию берёт их из себя или из первого дочернего источника, в котором они есть.
@ -88,7 +88,7 @@ public:
* Тотальных значений может не быть - тогда возвращается пустой блок.
*/
virtual const Block & getTotals();
/// То же самое для минимумов и максимумов.
const Block & getExtremes() const;
@ -152,7 +152,7 @@ public:
struct LocalLimits
{
LimitsMode mode;
size_t max_rows_to_read;
size_t max_bytes_to_read;
OverflowMode read_overflow_mode;
@ -207,7 +207,7 @@ protected:
Block extremes;
/// Ограничения и квоты.
LocalLimits limits;
QuotaForIntervals * quota = nullptr; /// Если nullptr - квота не используется.

View File

@ -112,6 +112,9 @@ private:
parent.have_space.signal();
}
Element * operator->() { return &*it; }
const Element * operator->() const { return &*it; }
Element & get() { return *it; }
const Element & get() const { return *it; }
};
@ -167,23 +170,13 @@ public:
}
/// Количество одновременно выполняющихся запросов.
size_t size() const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return cur_size;
}
size_t size() const { return cur_size; }
/// Получить текущее состояние (копию) списка запросов.
Containter get() const
{
Containter res;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
res = cont;
}
return res;
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return cont;
}
void setMaxSize(size_t max_size_)

View File

@ -19,11 +19,11 @@ class MergeList
const std::string table;
const std::string result_part_name;
std::uint64_t num_parts{};
std::uint64_t total_size_bytes{};
std::uint64_t total_size_bytes_compressed{};
std::uint64_t total_size_marks{};
std::uint64_t bytes_read{};
std::uint64_t bytes_read_uncompressed{};
std::uint64_t rows_read{};
std::uint64_t bytes_written{};
std::uint64_t bytes_written_uncompressed{};
std::uint64_t rows_written{};
MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name)

View File

@ -353,12 +353,6 @@ void IProfilingBlockInputStream::progressImpl(size_t rows, size_t bytes)
}
const BlockStreamProfileInfo & IProfilingBlockInputStream::getInfo() const
{
return info;
}
void IProfilingBlockInputStream::cancel()
{
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))

View File

@ -281,7 +281,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
Names part_columns = part->columns.getNames();
union_columns_set.insert(part_columns.begin(), part_columns.end());
merge_entry->total_size_bytes += part->size_in_bytes;
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
merge_entry->total_size_marks += part->size;
}
@ -304,63 +304,64 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
for (size_t i = 0; i < parts.size(); ++i)
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
auto input = stdext::make_unique<MergeTreeBlockInputStream>(
data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
parts[i], ranges, false, nullptr, ""), data.getPrimaryExpression()));
parts[i], ranges, false, nullptr, "");
input->setProgressCallback([&merge_entry] (const std::size_t rows, const std::size_t bytes) {
merge_entry->rows_read += rows;
merge_entry->bytes_read_uncompressed += bytes;
});
src_streams.push_back(new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression()));
sum_rows_approx += parts[i]->size * data.index_granularity;
}
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
/// В слитом куске строки с одинаковым ключом должны идти в порядке возрастания идентификатора исходного куска,
/// то есть (примерного) возрастания времени вставки.
BlockInputStreamPtr merged_stream;
std::unique_ptr<IProfilingBlockInputStream> merged_stream;
switch (data.mode)
{
case MergeTreeData::Ordinary:
merged_stream = new MergingSortedBlockInputStream(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = stdext::make_unique<MergingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Collapsing:
merged_stream = new CollapsingSortedBlockInputStream(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = stdext::make_unique<CollapsingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Summing:
merged_stream = new SummingSortedBlockInputStream(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = stdext::make_unique<SummingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Aggregating:
merged_stream = new AggregatingSortedBlockInputStream(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = stdext::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR);
}
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
const String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(data, new_part_tmp_path, union_columns);
MergedBlockOutputStream to{data, new_part_tmp_path, union_columns};
merged_stream->readPrefix();
to->writePrefix();
to.writePrefix();
size_t rows_written = 0;
size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
Block block;
while (!canceled && (block = merged_stream->read()))
{
const auto rows = block.rows();
const auto bytes = block.bytes();
rows_written += block.rows();
to.write(block);
merge_entry->rows_read += rows;
merge_entry->bytes_read += bytes;
rows_written += rows;
to->write(block);
merge_entry->rows_written += rows;
merge_entry->bytes_written += bytes;
merge_entry->rows_written = merged_stream->getInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getInfo().bytes;
if (disk_reservation)
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
@ -371,14 +372,14 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
merged_stream->readSuffix();
new_data_part->columns = union_columns;
new_data_part->checksums = to->writeSuffixAndGetChecksums();
new_data_part->index.swap(to->getIndex());
new_data_part->checksums = to.writeSuffixAndGetChecksums();
new_data_part->index.swap(to.getIndex());
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.
if (0 == to->marksCount())
if (0 == to.marksCount())
throw Exception("Empty part after merge", ErrorCodes::LOGICAL_ERROR);
new_data_part->size = to->marksCount();
new_data_part->size = to.marksCount();
new_data_part->modification_time = time(0);
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part_tmp_path);

View File

@ -15,11 +15,11 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name, const Context
{ "table", new DataTypeString },
{ "num_parts", new DataTypeUInt64 },
{ "result_part_name", new DataTypeString },
{ "total_size_bytes", new DataTypeUInt64 },
{ "total_size_bytes_compressed", new DataTypeUInt64 },
{ "total_size_marks", new DataTypeUInt64 },
{ "bytes_read", new DataTypeUInt64 },
{ "bytes_read_uncompressed", new DataTypeUInt64 },
{ "rows_read", new DataTypeUInt64 },
{ "bytes_written", new DataTypeUInt64 },
{ "bytes_written_uncompressed", new DataTypeUInt64 },
{ "rows_written", new DataTypeUInt64 }
}
{
@ -37,51 +37,45 @@ BlockInputStreams StorageSystemMerges::read(
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Block block;
ColumnWithNameAndType col_database{new ColumnString, new DataTypeString, "database"};
ColumnWithNameAndType col_table{new ColumnString, new DataTypeString, "table"};
ColumnWithNameAndType col_num_parts{new ColumnUInt64, new DataTypeUInt64, "num_parts"};
ColumnWithNameAndType col_result_part_name{new ColumnString, new DataTypeString, "result_part_name"};
ColumnWithNameAndType col_total_size_bytes{new ColumnUInt64, new DataTypeUInt64, "total_size_bytes"};
ColumnWithNameAndType col_total_size_bytes_compressed{new ColumnUInt64, new DataTypeUInt64, "total_size_bytes_compressed"};
ColumnWithNameAndType col_total_size_marks{new ColumnUInt64, new DataTypeUInt64, "total_size_marks"};
ColumnWithNameAndType col_bytes_read{new ColumnUInt64, new DataTypeUInt64, "bytes_read"};
ColumnWithNameAndType col_bytes_read_uncompressed{new ColumnUInt64, new DataTypeUInt64, "bytes_read_uncompressed"};
ColumnWithNameAndType col_rows_read{new ColumnUInt64, new DataTypeUInt64, "rows_read"};
ColumnWithNameAndType col_bytes_written{new ColumnUInt64, new DataTypeUInt64, "bytes_written"};
ColumnWithNameAndType col_bytes_written_uncompressed{new ColumnUInt64, new DataTypeUInt64, "bytes_written_uncompressed"};
ColumnWithNameAndType col_rows_written{new ColumnUInt64, new DataTypeUInt64, "rows_written"};
for (const auto & merge : context.getMergeList().get())
{
const auto bytes_read = merge.bytes_read;
const auto rows_read = merge.rows_read;
const auto bytes_written = merge.bytes_written;
const auto rows_written = merge.rows_written;
col_database.column->insert(merge.database);
col_table.column->insert(merge.table);
col_num_parts.column->insert(merge.num_parts);
col_result_part_name.column->insert(merge.result_part_name);
col_total_size_bytes.column->insert(merge.total_size_bytes);
col_total_size_bytes_compressed.column->insert(merge.total_size_bytes_compressed);
col_total_size_marks.column->insert(merge.total_size_marks);
col_bytes_read.column->insert(bytes_read);
col_rows_read.column->insert(rows_read);
col_bytes_written.column->insert(bytes_written);
col_rows_written.column->insert(rows_written);
col_bytes_read_uncompressed.column->insert(merge.bytes_read_uncompressed);
col_rows_read.column->insert(merge.rows_read);
col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed);
col_rows_written.column->insert(merge.rows_written);
}
block.insert(col_database);
block.insert(col_num_parts);
block.insert(col_table);
block.insert(col_total_size_bytes);
block.insert(col_result_part_name);
block.insert(col_bytes_read);
block.insert(col_total_size_marks);
block.insert(col_bytes_written);
block.insert(col_rows_read);
block.insert(col_rows_written);
Block block{
col_database,
col_num_parts,
col_table,
col_total_size_bytes_compressed,
col_result_part_name,
col_bytes_read_uncompressed,
col_total_size_marks,
col_bytes_written_uncompressed,
col_rows_read,
col_rows_written
};
return BlockInputStreams{1, new OneBlockInputStream{block}};
}
}

View File

@ -11,15 +11,17 @@ namespace DB
StorageSystemProcesses::StorageSystemProcesses(const std::string & name_, const Context & context_)
: name(name_), context(context_)
, columns{
{ "user", new DataTypeString },
{ "address", new DataTypeString },
{ "elapsed", new DataTypeFloat64 },
{ "rows_read", new DataTypeUInt64 },
{ "bytes_read", new DataTypeUInt64 },
{ "memory_usage", new DataTypeUInt64 },
{ "query", new DataTypeString },
{ "query_id", new DataTypeString }
}
{
columns.push_back(NameAndTypePair("user", new DataTypeString));
columns.push_back(NameAndTypePair("address", new DataTypeString));
columns.push_back(NameAndTypePair("elapsed", new DataTypeFloat64));
columns.push_back(NameAndTypePair("rows_read", new DataTypeUInt64));
columns.push_back(NameAndTypePair("bytes_read", new DataTypeUInt64));
columns.push_back(NameAndTypePair("memory_usage", new DataTypeUInt64));
columns.push_back(NameAndTypePair("query", new DataTypeString));
columns.push_back(NameAndTypePair("query_id", new DataTypeString));
}
StoragePtr StorageSystemProcesses::create(const std::string & name_, const Context & context_)
@ -35,73 +37,41 @@ BlockInputStreams StorageSystemProcesses::read(
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Block block;
ColumnWithNameAndType col_user{new ColumnString, new DataTypeString, "user"};
ColumnWithNameAndType col_address{new ColumnString, new DataTypeString, "address"};
ColumnWithNameAndType col_elapsed{new ColumnFloat64, new DataTypeFloat64, "elapsed"};
ColumnWithNameAndType col_rows_read{new ColumnUInt64, new DataTypeUInt64, "rows_read"};
ColumnWithNameAndType col_bytes_read{new ColumnUInt64, new DataTypeUInt64, "bytes_read"};
ColumnWithNameAndType col_memory_usage{new ColumnUInt64, new DataTypeUInt64, "memory_usage"};
ColumnWithNameAndType col_query{new ColumnString, new DataTypeString, "query"};
ColumnWithNameAndType col_query_id{new ColumnString, new DataTypeString, "query_id"};
ColumnWithNameAndType col_user;
col_user.name = "user";
col_user.type = new DataTypeString;
col_user.column = new ColumnString;
block.insert(col_user);
ColumnWithNameAndType col_address;
col_address.name = "address";
col_address.type = new DataTypeString;
col_address.column = new ColumnString;
block.insert(col_address);
ColumnWithNameAndType col_elapsed;
col_elapsed.name = "elapsed";
col_elapsed.type = new DataTypeFloat64;
col_elapsed.column = new ColumnFloat64;
block.insert(col_elapsed);
ColumnWithNameAndType col_rows_read;
col_rows_read.name = "rows_read";
col_rows_read.type = new DataTypeUInt64;
col_rows_read.column = new ColumnUInt64;
block.insert(col_rows_read);
ColumnWithNameAndType col_bytes_read;
col_bytes_read.name = "bytes_read";
col_bytes_read.type = new DataTypeUInt64;
col_bytes_read.column = new ColumnUInt64;
block.insert(col_bytes_read);
ColumnWithNameAndType col_memory_usage;
col_memory_usage.name = "memory_usage";
col_memory_usage.type = new DataTypeUInt64;
col_memory_usage.column = new ColumnUInt64;
block.insert(col_memory_usage);
ColumnWithNameAndType col_query;
col_query.name = "query";
col_query.type = new DataTypeString;
col_query.column = new ColumnString;
block.insert(col_query);
ColumnWithNameAndType col_query_id;
col_query_id.name = "query_id";
col_query_id.type = new DataTypeString;
col_query_id.column = new ColumnString;
block.insert(col_query_id);
ProcessList::Containter list = context.getProcessList().get();
for (ProcessList::Containter::const_iterator it = list.begin(); it != list.end(); ++it)
for (const auto & process : context.getProcessList().get())
{
size_t rows_read = it->rows_processed;
size_t bytes_read = it->bytes_processed;
col_user.column->insert(it->user);
col_address.column->insert(it->ip_address.toString());
col_elapsed.column->insert(it->watch.elapsedSeconds());
const size_t rows_read = process.rows_processed;
const size_t bytes_read = process.bytes_processed;
col_user.column->insert(process.user);
col_address.column->insert(process.ip_address.toString());
col_elapsed.column->insert(process.watch.elapsedSeconds());
col_rows_read.column->insert(rows_read);
col_bytes_read.column->insert(bytes_read);
col_memory_usage.column->insert(UInt64(it->memory_tracker.get()));
col_query.column->insert(it->query);
col_query_id.column->insert(it->query_id);
col_memory_usage.column->insert(static_cast<UInt64>(process.memory_tracker.get()));
col_query.column->insert(process.query);
col_query_id.column->insert(process.query_id);
}
Block block{
col_user,
col_address,
col_elapsed,
col_rows_read,
col_bytes_read,
col_memory_usage,
col_query,
col_query_id
};
return BlockInputStreams(1, new OneBlockInputStream(block));
}