Added method "getHeader" in IBlockInputStream [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-01-09 04:51:08 +03:00
parent fe880d73e7
commit 5081f276d1
17 changed files with 106 additions and 52 deletions

View File

@ -61,38 +61,29 @@ size_t IBlockInputStream::checkDepthImpl(size_t max_depth, size_t level) const
void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t multiplier)
{
/// We will not display the wrapper of the block stream in the AsynchronousBlockInputStream in the tree.
if (getName() != "Asynchronous")
ostr << String(indent, ' ') << getName();
if (multiplier > 1)
ostr << " × " << multiplier;
ostr << std::endl;
++indent;
/// If the subtree is repeated several times, then we output it once with the multiplier.
using Multipliers = std::map<String, size_t>;
Multipliers multipliers;
for (BlockInputStreams::const_iterator it = children.begin(); it != children.end(); ++it)
++multipliers[(*it)->getTreeID()];
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
{
ostr << String(indent, ' ') << getName();
if (multiplier > 1)
ostr << " × " << multiplier;
ostr << std::endl;
++indent;
/// If the subtree is repeated several times, then we output it once with the multiplier.
using Multipliers = std::map<String, size_t>;
Multipliers multipliers;
for (BlockInputStreams::const_iterator it = children.begin(); it != children.end(); ++it)
++multipliers[(*it)->getTreeID()];
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
String id = (*it)->getTreeID();
size_t & subtree_multiplier = multipliers[id];
if (subtree_multiplier != 0) /// Already printed subtrees are marked with zero in the array of multipliers.
{
String id = (*it)->getTreeID();
size_t & subtree_multiplier = multipliers[id];
if (subtree_multiplier != 0) /// Already printed subtrees are marked with zero in the array of multipliers.
{
(*it)->dumpTree(ostr, indent, subtree_multiplier);
subtree_multiplier = 0;
}
(*it)->dumpTree(ostr, indent, subtree_multiplier);
subtree_multiplier = 0;
}
}
else
{
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
(*it)->dumpTree(ostr, indent, multiplier);
}
}

View File

@ -120,6 +120,20 @@ Block ITableDeclaration::getSampleBlockNonMaterialized() const
}
Block ITableDeclaration::getSampleBlockForColumns(const Names & column_names) const
{
Block res;
for (const auto & name : column_names)
{
auto col = getColumn(name);
res.insert({ col.type->createColumn(), col.type, name });
}
return res;
}
static std::string listOfColumns(const NamesAndTypesList & available_columns)
{
std::stringstream s;

View File

@ -59,6 +59,7 @@ public:
*/
Block getSampleBlock() const;
Block getSampleBlockNonMaterialized() const;
Block getSampleBlockForColumns(const Names & column_names) const;
/** Verify that all the requested names are in the table and are set correctly.
* (the list of names is not empty and the names do not repeat)

View File

@ -31,10 +31,7 @@ public:
~MergeTreeBaseBlockInputStream() override;
Block getHeader() override { return storage.getSampleBlock(); };
protected:
Block readImpl() override final;
/// Creates new this->task, and initilizes readers
@ -48,7 +45,6 @@ protected:
void injectVirtualColumns(Block & block);
protected:
MergeTreeData & storage;
ExpressionActionsPtr prewhere_actions;

View File

@ -62,6 +62,13 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
setTotalRowsApprox(total_rows);
}
Block MergeTreeBlockInputStream::getHeader()
{
return storage.getSampleBlockForColumns(ordered_names);
}
String MergeTreeBlockInputStream::getID() const
{
std::stringstream res;

View File

@ -40,6 +40,8 @@ public:
String getID() const override;
Block getHeader() override;
/// Closes readers and unlock part locks
void finish();

View File

@ -117,6 +117,12 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
}
Block MergeTreeReadPool::getHeader() const
{
return data.getSampleBlockForColumns(column_names);
}
void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInfo info)
{
if (backoff_settings.min_read_latency_ms == 0 || do_not_steal_tasks)

View File

@ -79,6 +79,8 @@ public:
*/
void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info);
Block getHeader() const;
private:
std::vector<size_t> fillPerPartInfo(
RangesInDataParts & parts, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name,

View File

@ -38,6 +38,12 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
}
Block MergeTreeThreadBlockInputStream::getHeader()
{
return pool->getHeader();
};
String MergeTreeThreadBlockInputStream::getID() const
{
std::stringstream res;

View File

@ -1,5 +1,5 @@
#pragma once
#include "MergeTreeBaseBlockInputStream.h"
#include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h>
namespace DB
@ -34,6 +34,8 @@ public:
~MergeTreeThreadBlockInputStream() override;
Block getHeader() override;
protected:
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool getNewTask() override;

View File

@ -88,7 +88,7 @@ public:
return res.str();
}
Block getHeader() override { return storage.getSampleBlock(); };
Block getHeader() override { return storage.getSampleBlockForColumns(column_names); };
protected:
Block readImpl() override

View File

@ -93,7 +93,6 @@ StorageFile::StorageFile(
class StorageFileBlockInputStream : public IProfilingBlockInputStream
{
public:
StorageFileBlockInputStream(StorageFile & storage_, const Context & context, size_t max_block_size)
: storage(storage_)
{
@ -157,7 +156,7 @@ public:
return reader->read();
}
Block getHeader() override { return storage.getSampleBlock(); };
Block getHeader() override { return reader->getHeader(); };
void readPrefixImpl() override
{

View File

@ -176,7 +176,7 @@ public:
return reader->read();
}
Block getHeader() override { return storage.getSampleBlock(); };
Block getHeader() override { return reader->getHeader(); };
void readPrefixImpl() override
{

View File

@ -73,7 +73,15 @@ public:
return res.str();
}
Block getHeader() override { return storage.getSampleBlock(); };
Block getHeader() override
{
Block res;
for (const auto & name_type : columns)
res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
return res;
};
protected:
Block readImpl() override;

View File

@ -37,7 +37,7 @@ public:
return res.str();
}
Block getHeader() override { return storage.getSampleBlock(); };
Block getHeader() override { return storage.getSampleBlockForColumns(column_names); };
protected:
Block readImpl() override

View File

@ -63,23 +63,21 @@ public:
return s.str();
}
Block getHeader() override { return storage.getSampleBlock(); };
Block getHeader() override
{
start();
if (block_in)
return block_in->getHeader();
else
return {};
};
protected:
Block readImpl() override
{
Block res;
if (!started)
{
started = true;
data_in.emplace(
storage.full_path() + "data.bin", 0, 0,
std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size), Poco::File(storage.full_path() + "data.bin").getSize()));
block_in.emplace(*data_in, 0, true, index_begin, index_end);
}
start();
if (block_in)
{
@ -112,6 +110,20 @@ private:
bool started = false;
std::optional<CompressedReadBufferFromFile> data_in;
std::optional<NativeBlockInputStream> block_in;
void start()
{
if (!started)
{
started = true;
data_in.emplace(
storage.full_path() + "data.bin", 0, 0,
std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size), Poco::File(storage.full_path() + "data.bin").getSize()));
block_in.emplace(*data_in, 0, true, index_begin, index_end);
}
}
};

View File

@ -61,7 +61,15 @@ public:
String getID() const override;
Block getHeader() override { return storage.getSampleBlock(); };
Block getHeader() override
{
Block res;
for (const auto & name_type : columns)
res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
return res;
};
protected:
Block readImpl() override;