Merge branch 'master' into fix_expressions_in_metadata

This commit is contained in:
Alexander Tokmakov 2020-02-22 03:17:15 +03:00
commit d6205fa4fa
160 changed files with 1882 additions and 887 deletions

View File

@ -16,6 +16,7 @@ set (SRCS
setTerminalEcho.cpp
shift10.cpp
sleep.cpp
terminalColors.cpp
)
if (ENABLE_REPLXX)

View File

@ -0,0 +1,49 @@
#include <string>
#include <common/terminalColors.h>
std::string setColor(UInt64 hash)
{
/// Make a random RGB color that has constant brightness.
/// https://en.wikipedia.org/wiki/YCbCr
/// Note that this is darker than the middle relative luminance, see "Gamma_correction" and "Luma_(video)".
/// It still looks awesome.
UInt8 y = 128;
UInt8 cb = hash % 256;
UInt8 cr = hash / 256 % 256;
UInt8 r = std::max(0.0, std::min(255.0, y + 1.402 * (cr - 128)));
UInt8 g = std::max(0.0, std::min(255.0, y - 0.344136 * (cb - 128) - 0.714136 * (cr - 128)));
UInt8 b = std::max(0.0, std::min(255.0, y + 1.772 * (cb - 128)));
/// ANSI escape sequence to set 24-bit foreground font color in terminal.
return "\033[38;2;" + std::to_string(r) + ";" + std::to_string(g) + ";" + std::to_string(b) + "m";
}
const char * setColorForLogPriority(int priority)
{
if (priority < 1 || priority > 8)
return "";
static const char * colors[] =
{
"",
"\033[1;41m", /// Fatal
"\033[7;31m", /// Critical
"\033[1;31m", /// Error
"\033[0;31m", /// Warning
"\033[0;33m", /// Notice
"\033[1m", /// Information
"", /// Debug
"\033[2m", /// Trace
};
return colors[priority];
}
const char * resetColor()
{
return "\033[0m";
}

View File

@ -0,0 +1,15 @@
#include <string>
#include <common/Types.h>
/** Set color in terminal based on 64-bit hash value.
* It can be used to choose some random color deterministically based on some other value.
* Hash value should be uniformly distributed.
*/
std::string setColor(UInt64 hash);
/** Set color for logger priority value. */
const char * setColorForLogPriority(int priority);
/** Undo changes made by the functions above. */
const char * resetColor();

View File

@ -133,12 +133,13 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
split->addChannel(log);
}
bool is_tty = isatty(STDIN_FILENO) || isatty(STDERR_FILENO);
bool should_log_to_console = isatty(STDIN_FILENO) || isatty(STDERR_FILENO);
bool color_logs_by_default = isatty(STDERR_FILENO);
if (config.getBool("logger.console", false)
|| (!config.hasProperty("logger.console") && !is_daemon && is_tty))
|| (!config.hasProperty("logger.console") && !is_daemon && should_log_to_console))
{
bool color_enabled = config.getBool("logger.color_terminal", true) && is_tty;
bool color_enabled = config.getBool("logger.color_terminal", color_logs_by_default);
Poco::AutoPtr<OwnPatternFormatter> pf = new OwnPatternFormatter(this, OwnPatternFormatter::ADD_NOTHING, color_enabled);
Poco::AutoPtr<DB::OwnFormattingChannel> log = new DB::OwnFormattingChannel(pf, new Poco::ConsoleChannel);

View File

@ -9,57 +9,10 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Common/CurrentThread.h>
#include <common/getThreadId.h>
#include <common/terminalColors.h>
#include "Loggers.h"
static std::string setColor(UInt64 num)
{
/// Make a random RGB color that has constant brightness.
/// https://en.wikipedia.org/wiki/YCbCr
/// Note that this is darker than the middle relative luminance, see "Gamma_correction" and "Luma_(video)".
/// It still looks awesome.
UInt8 y = 128;
UInt8 cb = num % 256;
UInt8 cr = num / 256 % 256;
UInt8 r = std::max(0.0, std::min(255.0, y + 1.402 * (cr - 128)));
UInt8 g = std::max(0.0, std::min(255.0, y - 0.344136 * (cb - 128) - 0.714136 * (cr - 128)));
UInt8 b = std::max(0.0, std::min(255.0, y + 1.772 * (cb - 128)));
/// ANSI escape sequence to set 24-bit foreground font color in terminal.
return "\033[38;2;" + DB::toString(r) + ";" + DB::toString(g) + ";" + DB::toString(b) + "m";
}
static const char * setColorForLogPriority(int priority)
{
if (priority < 1 || priority > 8)
return "";
static const char * colors[] =
{
"",
"\033[1;41m", /// Fatal
"\033[7;31m", /// Critical
"\033[1;31m", /// Error
"\033[0;31m", /// Warning
"\033[0;33m", /// Notice
"\033[1m", /// Information
"", /// Debug
"\033[2m", /// Trace
};
return colors[priority];
}
static const char * resetColor()
{
return "\033[0m";
}
OwnPatternFormatter::OwnPatternFormatter(const Loggers * loggers_, OwnPatternFormatter::Options options_, bool color_)
: Poco::PatternFormatter(""), loggers(loggers_), options(options_), color(color_)
{

View File

@ -258,16 +258,6 @@ endif ()
add_subdirectory(src/Common/ZooKeeper)
add_subdirectory(src/Common/Config)
# It's Ok to avoid tracking of unresolved symbols for static linkage because
# they will be resolved at link time nevertheless.
function(target_ignore_unresolved_symbols target)
if (OS_DARWIN)
target_link_libraries (${target} PRIVATE -Wl,-undefined,dynamic_lookup)
else()
target_link_libraries (${target} PRIVATE -Wl,--unresolved-symbols=ignore-all)
endif()
endfunction()
set (all_modules)
macro(add_object_library name common_path)
if (MAKE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
@ -276,7 +266,7 @@ macro(add_object_library name common_path)
list (APPEND all_modules ${name})
add_headers_and_sources(${name} ${common_path})
add_library(${name} SHARED ${${name}_sources} ${${name}_headers})
target_ignore_unresolved_symbols(${name})
target_link_libraries (${name} PRIVATE -Wl,--unresolved-symbols=ignore-all)
endif ()
endmacro()
@ -307,7 +297,6 @@ add_object_library(clickhouse_processors_sources src/Processors/Sources)
if (MAKE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
add_library (dbms STATIC ${dbms_headers} ${dbms_sources})
set (all_modules dbms)
target_ignore_unresolved_symbols (dbms)
else()
add_library (dbms SHARED ${dbms_headers} ${dbms_sources})
target_link_libraries (dbms PUBLIC ${all_modules})
@ -562,6 +551,13 @@ endif()
if (USE_JEMALLOC)
dbms_target_include_directories (SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR}) # used in Interpreters/AsynchronousMetrics.cpp
target_include_directories (clickhouse_new_delete SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR})
if(NOT MAKE_STATIC_LIBRARIES AND ${JEMALLOC_LIBRARIES} MATCHES "${CMAKE_STATIC_LIBRARY_SUFFIX}$")
# mallctl in dbms/src/Interpreters/AsynchronousMetrics.cpp
# Actually we link JEMALLOC to almost all libraries.
# This is just hotfix for some uninvestigated problem.
target_link_libraries(clickhouse_interpreters PRIVATE ${JEMALLOC_LIBRARIES})
endif()
endif ()
dbms_target_include_directories (PUBLIC ${DBMS_INCLUDE_DIR})

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54432)
set(VERSION_REVISION 54433)
set(VERSION_MAJOR 20)
set(VERSION_MINOR 2)
set(VERSION_MINOR 3)
set(VERSION_PATCH 1)
set(VERSION_GITHASH 4b9acaaa9099e71c36e5c818031149c5cba2bbdb)
set(VERSION_DESCRIBE v20.2.1.1-prestable)
set(VERSION_STRING 20.2.1.1)
set(VERSION_GITHASH d93e7e5ccf8fcca724e917581b00bf569947fff9)
set(VERSION_DESCRIBE v20.3.1.1-prestable)
set(VERSION_STRING 20.3.1.1)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -130,7 +130,8 @@ private:
bool echo_queries = false; /// Print queries before execution in batch mode.
bool ignore_error = false; /// In case of errors, don't print error message, continue to next query. Only applicable for non-interactive mode.
bool print_time_to_stderr = false; /// Output execution time to stderr in batch mode.
bool stdin_is_not_tty = false; /// stdin is not a terminal.
bool stdin_is_a_tty = false; /// stdin is a terminal.
bool stdout_is_a_tty = false; /// stdout is a terminal.
uint16_t terminal_width = 0; /// Terminal width is needed to render progress bar.
@ -378,7 +379,7 @@ private:
/// The value of the option is used as the text of query (or of multiple queries).
/// If stdin is not a terminal, INSERT data for the first query is read from it.
/// - stdin is not a terminal. In this case queries are read from it.
if (stdin_is_not_tty || config().has("query"))
if (!stdin_is_a_tty || config().has("query"))
is_interactive = false;
std::cout << std::fixed << std::setprecision(3);
@ -874,7 +875,7 @@ private:
if (!select && !external_tables.empty())
throw Exception("External tables could be sent only with select query", ErrorCodes::BAD_ARGUMENTS);
std::vector<ExternalTableData> data;
std::vector<ExternalTableDataPtr> data;
for (auto & table : external_tables)
data.emplace_back(table.getData(context));
@ -910,7 +911,7 @@ private:
? query.substr(0, parsed_insert_query.data - query.data())
: query;
if (!parsed_insert_query.data && (is_interactive || (stdin_is_not_tty && std_in.eof())))
if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
connection->sendQuery(connection_parameters.timeouts, query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
@ -1332,7 +1333,7 @@ private:
}
}
logs_out_stream = std::make_shared<InternalTextLogsRowOutputStream>(*wb);
logs_out_stream = std::make_shared<InternalTextLogsRowOutputStream>(*wb, stdout_is_a_tty);
logs_out_stream->writePrefix();
}
}
@ -1643,9 +1644,10 @@ public:
}
}
stdin_is_not_tty = !isatty(STDIN_FILENO);
stdin_is_a_tty = isatty(STDIN_FILENO);
stdout_is_a_tty = isatty(STDOUT_FILENO);
if (!stdin_is_not_tty)
if (stdin_is_a_tty)
terminal_width = getTerminalWidth();
namespace po = boost::program_options;

View File

@ -22,6 +22,9 @@
#include <Common/config_version.h>
#include <Interpreters/ClientInfo.h>
#include <Compression/CompressionFactory.h>
#include <Processors/Pipe.h>
#include <Processors/ISink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Common/config.h>
#if USE_POCO_NETSSL
@ -534,6 +537,45 @@ void Connection::sendScalarsData(Scalars & data)
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
}
namespace
{
/// Sink which sends data for external table.
class ExternalTableDataSink : public ISink
{
public:
using OnCancell = std::function<void()>;
ExternalTableDataSink(Block header, Connection & connection_, ExternalTableData & table_data_, OnCancell callback)
: ISink(std::move(header)), connection(connection_), table_data(table_data_),
on_cancell(std::move(callback))
{}
String getName() const override { return "ExternalTableSink"; }
size_t getNumReadRows() const { return num_rows; }
protected:
void consume(Chunk chunk) override
{
if (table_data.is_cancelled)
{
on_cancell();
return;
}
num_rows += chunk.getNumRows();
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
connection.sendData(block, table_data.table_name);
}
private:
Connection & connection;
ExternalTableData & table_data;
OnCancell on_cancell;
size_t num_rows = 0;
};
}
void Connection::sendExternalTablesData(ExternalTablesData & data)
{
@ -553,13 +595,24 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
for (auto & elem : data)
{
elem.first->readPrefix();
while (Block block = elem.first->read())
{
rows += block.rows();
sendData(block, elem.second);
}
elem.first->readSuffix();
PipelineExecutorPtr executor;
auto on_cancel = [& executor]() { executor->cancel(); };
auto sink = std::make_shared<ExternalTableDataSink>(elem->pipe->getHeader(), *this, *elem, std::move(on_cancel));
DB::connect(elem->pipe->getPort(), sink->getPort());
auto processors = std::move(*elem->pipe).detachProcessors();
processors.push_back(sink);
executor = std::make_shared<PipelineExecutor>(processors);
executor->execute(/*num_threads = */ 1);
auto read_rows = sink->getNumReadRows();
rows += read_rows;
/// If table is empty, send empty block with name.
if (read_rows == 0)
sendData(sink->getPort().getHeader(), elem->table_name);
}
/// Send empty block, which means end of data transfer.

View File

@ -30,11 +30,20 @@ namespace DB
{
class ClientInfo;
class Pipe;
/// The stream of blocks reading from the table and its name
using ExternalTableData = std::pair<BlockInputStreamPtr, std::string>;
/// Vector of pairs describing tables
using ExternalTablesData = std::vector<ExternalTableData>;
/// Struct which represents data we are going to send for external table.
struct ExternalTableData
{
/// Pipe of data form table;
std::unique_ptr<Pipe> pipe;
std::string table_name;
/// Flag if need to stop reading.
std::atomic_bool is_cancelled = false;
};
using ExternalTableDataPtr = std::unique_ptr<ExternalTableData>;
using ExternalTablesData = std::vector<ExternalTableDataPtr>;
class Connection;

View File

@ -346,18 +346,14 @@ ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets & offsets) const
if (0 == size)
return this->create();
auto res = this->create();
typename Self::Container & res_data = res->getData();
res_data.reserve(offsets.back());
auto res = this->create(offsets.back());
IColumn::Offset prev_offset = 0;
auto it = res->getData().begin();
for (size_t i = 0; i < size; ++i)
{
size_t size_to_replicate = offsets[i] - prev_offset;
prev_offset = offsets[i];
for (size_t j = 0; j < size_to_replicate; ++j)
res_data.push_back(data[i]);
const auto span_end = res->getData().begin() + offsets[i];
for (; it < span_end; ++it)
*it = data[i];
}
return res;

View File

@ -186,3 +186,28 @@ TEST(column_unique, column_unique_unique_deserialize_from_arena_Nullable_String_
auto column = ColumnNullable::create(std::move(column_string), std::move(null_mask));
column_unique_unique_deserialize_from_arena_impl(*column, *data_type);
}
TEST(ColumnVector, correctness_of_replicate)
{
const auto column = ColumnUInt8::create();
column->insertValue(3);
column->insertValue(2);
column->insertValue(1);
const auto empty_column = column->replicate({0, 0, 0});
const auto empty_column_ptr = typeid_cast<const ColumnUInt8 *>(empty_column.get());
EXPECT_NE(empty_column_ptr, nullptr);
EXPECT_EQ(empty_column_ptr->size(), 0);
const auto new_column = column->replicate({1, 1, 5});
const auto new_column_ptr = typeid_cast<const ColumnUInt8 *>(new_column.get());
EXPECT_NE(new_column_ptr, nullptr);
EXPECT_EQ(new_column_ptr->size(), 5);
auto it = new_column_ptr->getData().cbegin();
for (const auto num : {3, 1, 1, 1, 1})
{
EXPECT_EQ(*it, num);
++it;
}
}

View File

@ -309,6 +309,9 @@ bool OptimizedRegularExpressionImpl<thread_safe>::match(const char * subject, si
if (is_trivial)
{
if (required_substring.empty())
return true;
if (is_case_insensitive)
return haystack_end != case_insensitive_substring_searcher->search(haystack, subject_size);
else
@ -343,6 +346,9 @@ bool OptimizedRegularExpressionImpl<thread_safe>::match(const char * subject, si
if (is_trivial)
{
if (required_substring.empty())
return true;
const UInt8 * pos;
if (is_case_insensitive)
pos = case_insensitive_substring_searcher->search(haystack, subject_size);
@ -402,6 +408,9 @@ unsigned OptimizedRegularExpressionImpl<thread_safe>::match(const char * subject
if (is_trivial)
{
if (required_substring.empty())
return 1;
const UInt8 * pos;
if (is_case_insensitive)
pos = case_insensitive_substring_searcher->search(haystack, subject_size);

View File

@ -9,8 +9,12 @@
#include <IO/LimitReadBuffer.h>
#include <Storages/StorageMemory.h>
#include <Poco/Net/MessageHeader.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <Core/ExternalTable.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
namespace DB
@ -22,12 +26,18 @@ namespace ErrorCodes
}
ExternalTableData BaseExternalTable::getData(const Context & context)
ExternalTableDataPtr BaseExternalTable::getData(const Context & context)
{
initReadBuffer();
initSampleBlock();
auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
return std::make_pair(std::make_shared<AsynchronousBlockInputStream>(input), name);
auto stream = std::make_shared<AsynchronousBlockInputStream>(input);
auto data = std::make_unique<ExternalTableData>();
data->table_name = name;
data->pipe = std::make_unique<Pipe>(std::make_shared<SourceFromInputStream>(std::move(stream)));
return data;
}
void BaseExternalTable::clean()
@ -156,22 +166,24 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
else
throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields " + name + "_structure or " + name + "_types to do so.", ErrorCodes::BAD_ARGUMENTS);
ExternalTableData data = getData(context);
ExternalTableDataPtr data = getData(context);
/// Create table
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
StoragePtr storage = StorageMemory::create(StorageID("_external", data.second), ColumnsDescription{columns}, ConstraintsDescription{});
StoragePtr storage = StorageMemory::create(StorageID("_external", data->table_name), ColumnsDescription{columns}, ConstraintsDescription{});
storage->startup();
context.addExternalTable(data.second, storage);
context.addExternalTable(data->table_name, storage);
BlockOutputStreamPtr output = storage->write(ASTPtr(), context);
/// Write data
data.first->readPrefix();
output->writePrefix();
while (Block block = data.first->read())
output->write(block);
data.first->readSuffix();
output->writeSuffix();
auto sink = std::make_shared<SinkToOutputStream>(std::move(output));
connect(data->pipe->getPort(), sink->getPort());
auto processors = std::move(*data->pipe).detachProcessors();
processors.push_back(std::move(sink));
auto executor = std::make_shared<PipelineExecutor>(processors);
executor->execute(/*num_threads = */ 1);
/// We are ready to receive the next file, for this we clear all the information received
clean();

View File

@ -57,7 +57,7 @@ public:
virtual void initReadBuffer() {}
/// Get the table data - a pair (a stream with the contents of the table, the name of the table)
ExternalTableData getData(const Context & context);
ExternalTableDataPtr getData(const Context & context);
protected:
/// Clear all accumulated information

View File

@ -122,6 +122,7 @@ void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
/// Then the data about `rows_before_limit` can be in `RemoteBlockInputStream` (come from a remote server).
BlockStreamProfileInfos remotes;
collectInfosForStreamsWithName("Remote", remotes);
collectInfosForStreamsWithName("TreeExecutor", remotes);
if (remotes.empty())
return;

View File

@ -19,33 +19,31 @@ namespace DB
/** A stream of blocks from a shared vector of blocks
*/
class BlocksBlockInputStream : public IBlockInputStream
class BlocksSource : public SourceWithProgress
{
public:
/// Acquires shared ownership of the blocks vector
BlocksBlockInputStream(const std::shared_ptr<BlocksPtr> & blocks_ptr_, Block header_)
: blocks(*blocks_ptr_), it((*blocks_ptr_)->begin()), end((*blocks_ptr_)->end()), header(std::move(header_)) {}
BlocksSource(const std::shared_ptr<BlocksPtr> & blocks_ptr_, Block header)
: SourceWithProgress(std::move(header))
, blocks(*blocks_ptr_), it((*blocks_ptr_)->begin()), end((*blocks_ptr_)->end()) {}
String getName() const override { return "Blocks"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override
Chunk generate() override
{
if (it == end)
return Block();
return {};
Block res = *it;
++it;
return res;
return Chunk(res.getColumns(), res.rows());
}
private:
BlocksPtr blocks;
Blocks::iterator it;
const Blocks::iterator end;
Block header;
};
}

View File

@ -98,6 +98,10 @@ Block ConvertingBlockInputStream::readImpl()
return src;
Block res = header.cloneEmpty();
/// This is important because header.cloneEmpty() doesn't copy info about aggregation bucket.
/// Otherwise information in buckets may be lost (and aggregation will return wrong result).
res.info = src.info;
for (size_t res_pos = 0, size = conversion.size(); res_pos < size; ++res_pos)
{
const auto & src_elem = src.getByPosition(conversion[res_pos]);

View File

@ -2,10 +2,12 @@
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Common/typeid_cast.h>
#include <Common/HashTable/Hash.h>
#include <DataTypes/IDataType.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <IO/WriteHelpers.h>
#include <common/terminalColors.h>
namespace DB
@ -35,7 +37,11 @@ void InternalTextLogsRowOutputStream::write(const Block & block)
if (host_name.size)
{
writeCString("[", wb);
if (color)
writeString(setColor(StringRefHash()(host_name)), wb);
writeString(host_name, wb);
if (color)
writeCString(resetColor(), wb);
writeCString("] ", wb);
}
@ -51,21 +57,34 @@ void InternalTextLogsRowOutputStream::write(const Block & block)
writeChar('0' + ((microseconds / 10) % 10), wb);
writeChar('0' + ((microseconds / 1) % 10), wb);
UInt64 thread_id = array_thread_id[row_num];
writeCString(" [ ", wb);
if (color)
writeString(setColor(intHash64(thread_id)), wb);
writeIntText(thread_id, wb);
if (color)
writeCString(resetColor(), wb);
writeCString(" ]", wb);
auto query_id = column_query_id.getDataAt(row_num);
if (query_id.size)
{
writeCString(" {", wb);
if (color)
writeString(setColor(StringRefHash()(query_id)), wb);
writeString(query_id, wb);
if (color)
writeCString(resetColor(), wb);
writeCString("}", wb);
}
UInt64 thread_id = array_thread_id[row_num];
writeCString(" [ ", wb);
writeIntText(thread_id, wb);
writeCString(" ] <", wb);
Int8 priority = array_priority[row_num];
writeCString(" <", wb);
if (color)
writeCString(setColorForLogPriority(priority), wb);
writeString(InternalTextLogsQueue::getPriorityName(priority), wb);
if (color)
writeCString(resetColor(), wb);
writeCString("> ", wb);
auto source = column_source.getDataAt(row_num);

View File

@ -12,8 +12,7 @@ namespace DB
class InternalTextLogsRowOutputStream : public IBlockOutputStream
{
public:
InternalTextLogsRowOutputStream(WriteBuffer & buf_out) : wb(buf_out) {}
InternalTextLogsRowOutputStream(WriteBuffer & buf_out, bool color_) : wb(buf_out), color(color_) {}
Block getHeader() const override;
@ -25,8 +24,8 @@ public:
}
private:
WriteBuffer & wb;
bool color;
};
}

View File

@ -7,8 +7,12 @@
#include <Interpreters/castColumn.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Storages/IStorage.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Pipe.h>
#include <IO/ConnectionTimeouts.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
namespace DB
@ -112,7 +116,7 @@ void RemoteBlockInputStream::cancel(bool kill)
/// Stop sending external data.
for (auto & vec : external_tables_data)
for (auto & elem : vec)
elem.first->cancel(kill);
elem->is_cancelled = true;
}
if (!isQueryPending() || hasThrownException())
@ -142,12 +146,26 @@ void RemoteBlockInputStream::sendExternalTables()
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context);
BlockInputStreams input = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)
res.push_back(std::make_pair(std::make_shared<OneBlockInputStream>(cur->getSampleBlock()), table.first));
Pipes pipes;
pipes = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
auto data = std::make_unique<ExternalTableData>();
data->table_name = table.first;
if (pipes.empty())
data->pipe = std::make_unique<Pipe>(std::make_shared<SourceFromSingleChunk>(cur->getSampleBlock(), Chunk()));
else if (pipes.size() == 1)
data->pipe = std::make_unique<Pipe>(std::move(pipes.front()));
else
res.push_back(std::make_pair(input[0], table.first));
{
auto concat = std::make_shared<ConcatProcessor>(pipes.front().getHeader(), pipes.size());
data->pipe = std::make_unique<Pipe>(std::move(pipes), std::move(concat));
}
res.emplace_back(std::move(data));
}
external_tables_data.push_back(std::move(res));
}

View File

@ -1,12 +1,29 @@
#include <random>
#include <Common/thread_local_rng.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Pipe.h>
#include "narrowBlockInputStreams.h"
namespace DB
{
namespace
{
using Distribution = std::vector<size_t>;
Distribution getDistribution(size_t from, size_t to)
{
Distribution distribution(from);
for (size_t i = 0; i < from; ++i)
distribution[i] = i % to;
std::shuffle(distribution.begin(), distribution.end(), thread_local_rng);
return distribution;
}
}
BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width)
{
size_t size = inputs.size();
@ -15,13 +32,7 @@ BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t wid
std::vector<BlockInputStreams> partitions(width);
using Distribution = std::vector<size_t>;
Distribution distribution(size);
for (size_t i = 0; i < size; ++i)
distribution[i] = i % width;
std::shuffle(distribution.begin(), distribution.end(), thread_local_rng);
auto distribution = getDistribution(size, width);
for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].push_back(inputs[i]);
@ -33,4 +44,29 @@ BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t wid
return res;
}
Pipes narrowPipes(Pipes pipes, size_t width)
{
size_t size = pipes.size();
if (size <= width)
return pipes;
std::vector<Pipes> partitions(width);
auto distribution = getDistribution(size, width);
for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].emplace_back(std::move(pipes[i]));
Pipes res;
res.reserve(width);
for (size_t i = 0; i < width; ++i)
{
auto processor = std::make_shared<ConcatProcessor>(partitions[i].at(0).getHeader(), partitions[i].size());
res.emplace_back(std::move(partitions[i]), std::move(processor));
}
return res;
}
}

View File

@ -6,6 +6,9 @@
namespace DB
{
class Pipe;
using Pipes = std::vector<Pipe>;
/** If the number of sources of `inputs` is greater than `width`,
* then glues the sources to each other (using ConcatBlockInputStream),
* so that the number of sources becomes no more than `width`.
@ -14,5 +17,6 @@ namespace DB
* (to avoid overweighting if the distribution of the amount of data in different sources is subject to some pattern)
*/
BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width);
Pipes narrowPipes(Pipes pipes, size_t width);
}

View File

@ -20,6 +20,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
int main(int argc, char ** argv)
@ -54,7 +55,7 @@ try
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in;
in = table->read(column_names, {}, context, stage, 8192, 1)[0];
in = std::make_shared<TreeExecutorBlockInputStream>(std::move(table->read(column_names, {}, context, stage, 8192, 1)[0]));
in = std::make_shared<ExpressionBlockInputStream>(in, expression);
in = std::make_shared<LimitBlockInputStream>(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));

View File

@ -22,6 +22,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
int main(int argc, char ** argv)
@ -58,7 +59,7 @@ try
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in = table->read(column_names, {}, context, stage, 8192, 1)[0];
BlockInputStreamPtr in = std::make_shared<TreeExecutorBlockInputStream>(std::move(table->read(column_names, {}, context, stage, 8192, 1)[0]));
in = std::make_shared<FilterBlockInputStream>(in, expression, "equals(modulo(number, 3), 1)");
in = std::make_shared<LimitBlockInputStream>(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));

View File

@ -15,6 +15,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/loadMetadata.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
using namespace DB;
@ -36,10 +37,12 @@ try
StoragePtr table = context.getTable("default", "hits6");
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreams streams = table->read(column_names, {}, context, stage, settings.max_block_size, settings.max_threads);
auto pipes = table->read(column_names, {}, context, stage, settings.max_block_size, settings.max_threads);
BlockInputStreams streams(pipes.size());
for (size_t i = 0, size = streams.size(); i < size; ++i)
streams[i] = std::make_shared<AsynchronousBlockInputStream>(streams[i]);
streams[i] = std::make_shared<AsynchronousBlockInputStream>(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipes[i])));
BlockInputStreamPtr stream = std::make_shared<UnionBlockInputStream>(streams, nullptr, settings.max_threads);
stream = std::make_shared<LimitBlockInputStream>(stream, 10, 0);

View File

@ -11,6 +11,7 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
@ -106,7 +107,11 @@ bool DiskLocal::tryReserve(UInt64 bytes)
UInt64 DiskLocal::getTotalSpace() const
{
auto fs = getStatVFS(disk_path);
struct statvfs fs;
if (name == "default") /// for default disk we get space from path/data/
fs = getStatVFS(disk_path + "data/");
else
fs = getStatVFS(disk_path);
UInt64 total_size = fs.f_blocks * fs.f_bsize;
if (total_size < keep_free_space_bytes)
return 0;
@ -117,7 +122,11 @@ UInt64 DiskLocal::getAvailableSpace() const
{
/// we use f_bavail, because part of b_free space is
/// available for superuser only and for system purposes
auto fs = getStatVFS(disk_path);
struct statvfs fs;
if (name == "default") /// for default disk we get space from path/data/
fs = getStatVFS(disk_path + "data/");
else
fs = getStatVFS(disk_path);
UInt64 total_size = fs.f_bavail * fs.f_bsize;
if (total_size < keep_free_space_bytes)
return 0;

View File

@ -11,6 +11,10 @@ struct Settings;
class Context;
class Cluster;
class Throttler;
struct SelectQueryInfo;
class Pipe;
using Pipes = std::vector<Pipe>;
namespace ClusterProxy
{
@ -26,7 +30,8 @@ public:
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) = 0;
const SelectQueryInfo & query_info,
Pipes & res) = 0;
};
}

View File

@ -12,7 +12,10 @@
#include <common/logger_useful.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <Processors/Pipe.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
namespace ProfileEvents
{
@ -67,12 +70,32 @@ SelectStreamFactory::SelectStreamFactory(
namespace
{
BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage)
Pipe createLocalStream(const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage, bool force_tree_shaped_pipeline)
{
checkStackSize();
InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)};
BlockInputStreamPtr stream = interpreter.execute().in;
if (force_tree_shaped_pipeline)
{
/// This flag means that pipeline must be tree-shaped,
/// so we can't enable processors for InterpreterSelectQuery here.
auto stream = interpreter.execute().in;
Pipe pipe(std::make_shared<SourceFromInputStream>(std::move(stream)));
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
pipe.getHeader(), header, ConvertingTransform::MatchColumnsMode::Name, context));
return pipe;
}
auto pipeline = interpreter.executeWithProcessors();
pipeline.addSimpleTransform([&](const Block & source_header)
{
return std::make_shared<ConvertingTransform>(
source_header, header, ConvertingTransform::MatchColumnsMode::Name, context);
});
/** Materialization is needed, since from remote servers the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
@ -84,7 +107,7 @@ BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Block & he
*/
/// return std::make_shared<MaterializingBlockInputStream>(stream);
return std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
return std::move(pipeline).getPipe();
}
static String formattedAST(const ASTPtr & ast)
@ -102,15 +125,19 @@ void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String &, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res)
const SelectQueryInfo & query_info,
Pipes & res)
{
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals_port = processed_stage == QueryProcessingStage::Complete;
auto modified_query_ast = query_ast->clone();
if (has_virtual_shard_num_column)
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32");
auto emplace_local_stream = [&]()
{
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage));
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage, query_info.force_tree_shaped_pipeline));
};
String modified_query = formattedAST(modified_query_ast);
@ -122,7 +149,13 @@ void SelectStreamFactory::createForShard(
stream->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
auto source = std::make_shared<SourceFromInputStream>(std::move(stream), force_add_agg_info);
if (add_totals_port)
source->addTotalsPort();
res.emplace_back(std::move(source));
};
const auto & settings = context.getSettingsRef();
@ -250,7 +283,7 @@ void SelectStreamFactory::createForShard(
}
if (try_results.empty() || local_delay < max_remote_delay)
return createLocalStream(modified_query_ast, header, context, stage);
return std::make_shared<TreeExecutorBlockInputStream>(createLocalStream(modified_query_ast, header, context, stage, true));
else
{
std::vector<IConnectionPool::Entry> connections;
@ -263,7 +296,13 @@ void SelectStreamFactory::createForShard(
}
};
res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream));
auto lazy_stream = std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream);
auto source = std::make_shared<SourceFromInputStream>(std::move(lazy_stream), force_add_agg_info);
if (add_totals_port)
source->addTotalsPort();
res.emplace_back(std::move(source));
}
else
emplace_remote_stream();

View File

@ -35,7 +35,8 @@ public:
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) override;
const SelectQueryInfo & query_info,
Pipes & res) override;
private:
const Block header;

View File

@ -6,6 +6,7 @@
#include <Interpreters/IInterpreter.h>
#include <Parsers/queryToString.h>
#include <Interpreters/ProcessList.h>
#include <Processors/Pipe.h>
namespace DB
@ -36,11 +37,11 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
return new_context;
}
BlockInputStreams executeQuery(
Pipes executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster,
const ASTPtr & query_ast, const Context & context, const Settings & settings)
const ASTPtr & query_ast, const Context & context, const Settings & settings, const SelectQueryInfo & query_info)
{
BlockInputStreams res;
Pipes res;
const std::string query = queryToString(query_ast);
@ -64,7 +65,7 @@ BlockInputStreams executeQuery(
throttler = user_level_throttler;
for (const auto & shard_info : cluster->getShardsInfo())
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, res);
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, query_info, res);
return res;
}

View File

@ -9,6 +9,10 @@ namespace DB
struct Settings;
class Context;
class Cluster;
struct SelectQueryInfo;
class Pipe;
using Pipes = std::vector<Pipe>;
namespace ClusterProxy
{
@ -22,9 +26,9 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query
/// (currently SELECT, DESCRIBE).
BlockInputStreams executeQuery(
Pipes executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster,
const ASTPtr & query_ast, const Context & context, const Settings & settings);
const ASTPtr & query_ast, const Context & context, const Settings & settings, const SelectQueryInfo & query_info);
}

View File

@ -296,7 +296,7 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
return;
}
auto interpreter_subquery = interpretSubquery(subquery_or_table_name, context, subquery_depth + 1, {});
auto interpreter_subquery = interpretSubquery(subquery_or_table_name, context, {}, query_options);
BlockIO res = interpreter_subquery->execute();
SetPtr set = std::make_shared<Set>(settings.size_limits_for_set, true);
@ -596,7 +596,7 @@ void SelectQueryExpressionAnalyzer::makeSubqueryForJoin(const ASTTablesInSelectQ
for (auto & pr : required_columns_with_aliases)
original_columns.push_back(pr.first);
auto interpreter = interpretSubquery(join_element.table_expression, context, subquery_depth, original_columns);
auto interpreter = interpretSubquery(join_element.table_expression, context, original_columns, query_options);
subquery_for_set.makeSource(interpreter, std::move(required_columns_with_aliases));
}

View File

@ -228,11 +228,12 @@ public:
const SyntaxAnalyzerResultPtr & syntax_analyzer_result_,
const Context & context_,
const NameSet & required_result_columns_ = {},
size_t subquery_depth_ = 0,
bool do_global_ = false)
: ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, subquery_depth_, do_global_)
, required_result_columns(required_result_columns_)
{}
bool do_global_ = false,
const SelectQueryOptions & options_ = {})
: ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, options_.subquery_depth, do_global_)
, required_result_columns(required_result_columns_), query_options(options_)
{
}
/// Does the expression have aggregate functions or a GROUP BY or HAVING section.
bool hasAggregation() const { return has_aggregation; }
@ -258,6 +259,7 @@ public:
private:
/// If non-empty, ignore all expressions not from this list.
NameSet required_result_columns;
SelectQueryOptions query_options;
/**
* Create Set from a subquery or a table expression in the query. The created set is suitable for using the index.

View File

@ -53,7 +53,8 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
else if (ast.getKind() == ASTExplainQuery::AnalyzedSyntax)
{
InterpreterSelectWithUnionQuery interpreter(ast.children.at(0), context,
SelectQueryOptions(QueryProcessingStage::FetchColumns).analyze().modify());
SelectQueryOptions(QueryProcessingStage::FetchColumns).analyze().modify());
interpreter.getQuery()->format(IAST::FormatSettings(ss, false));
}

View File

@ -325,7 +325,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, *context,
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
options.subquery_depth, !options.only_analyze);
!options.only_analyze, options);
if (!options.only_analyze)
{
@ -1406,16 +1406,12 @@ void InterpreterSelectQuery::executeFetchColumns(
BlockInputStreams streams;
Pipes pipes;
/// Will work with pipes directly if storage support processors.
/// Code is temporarily copy-pasted while moving to new pipeline.
bool use_pipes = pipeline_with_processors && storage->supportProcessorsPipeline();
if (use_pipes)
pipes = storage->readWithProcessors(required_columns, query_info, *context, processing_stage, max_block_size, max_streams);
if (pipeline_with_processors)
pipes = storage->read(required_columns, query_info, *context, processing_stage, max_block_size, max_streams);
else
streams = storage->read(required_columns, query_info, *context, processing_stage, max_block_size, max_streams);
streams = storage->readStreams(required_columns, query_info, *context, processing_stage, max_block_size, max_streams);
if (streams.empty() && !use_pipes)
if (streams.empty() && !pipeline_with_processors)
{
streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
@ -1446,7 +1442,8 @@ void InterpreterSelectQuery::executeFetchColumns(
}
/// Copy-paste from prev if.
if (pipes.empty() && use_pipes)
/// Code is temporarily copy-pasted while moving to new pipeline.
if (pipes.empty() && pipeline_with_processors)
{
Pipe pipe(std::make_shared<NullSource>(storage->getSampleBlockForColumns(required_columns)));
@ -1475,8 +1472,7 @@ void InterpreterSelectQuery::executeFetchColumns(
if constexpr (pipeline_with_processors)
{
/// Table lock is stored inside pipeline here.
if (use_pipes)
pipeline.addTableLock(table_lock);
pipeline.addTableLock(table_lock);
}
/// Set the limits and quota for reading data, the speed and time of the query.

View File

@ -327,7 +327,7 @@ public:
private:
friend class NonJoinedBlockInputStream;
friend class JoinBlockInputStream;
friend class JoinSource;
std::shared_ptr<AnalyzedJoin> table_join;
ASTTableJoin::Kind kind;

View File

@ -461,7 +461,17 @@ MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & ri
, max_rows_in_right_block(table_join->maxRowsInRightBlock())
{
if (!isLeft(table_join->kind()) && !isInner(table_join->kind()))
throw Exception("Partial merge supported for LEFT and INNER JOINs only", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Not supported. PartialMergeJoin supports LEFT and INNER JOINs kinds.", ErrorCodes::NOT_IMPLEMENTED);
switch (table_join->strictness())
{
case ASTTableJoin::Strictness::Any:
case ASTTableJoin::Strictness::All:
case ASTTableJoin::Strictness::Semi:
break;
default:
throw Exception("Not supported. PartialMergeJoin supports ALL, ANY and SEMI JOINs variants.", ErrorCodes::NOT_IMPLEMENTED);
}
if (!max_rows_in_right_block)
throw Exception("partial_merge_join_rows_in_right_blocks cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND);

View File

@ -18,6 +18,13 @@ namespace DB
std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
const ASTPtr & table_expression, const Context & context, size_t subquery_depth, const Names & required_source_columns)
{
auto subquery_options = SelectQueryOptions(QueryProcessingStage::Complete, subquery_depth);
return interpretSubquery(table_expression, context, required_source_columns, subquery_options);
}
std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
const ASTPtr & table_expression, const Context & context, const Names & required_source_columns, const SelectQueryOptions & options)
{
if (auto * expr = table_expression->as<ASTTableExpression>())
{
@ -29,7 +36,7 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
else if (expr->database_and_table_name)
table = expr->database_and_table_name;
return interpretSubquery(table, context, subquery_depth, required_source_columns);
return interpretSubquery(table, context, required_source_columns, options);
}
/// Subquery or table name. The name of the table is similar to the subquery `SELECT * FROM t`.
@ -55,7 +62,7 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
auto subquery_options = SelectQueryOptions(QueryProcessingStage::Complete, subquery_depth).subquery();
auto subquery_options = options.subquery();
ASTPtr query;
if (table || function)

View File

@ -11,4 +11,7 @@ class Context;
std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
const ASTPtr & table_expression, const Context & context, size_t subquery_depth, const Names & required_source_columns);
std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
const ASTPtr & table_expression, const Context & context, const Names & required_source_columns, const SelectQueryOptions & options);
}

View File

@ -2,12 +2,20 @@
#include <Processors/Sources/SourceWithProgress.h>
#include <Interpreters/ProcessList.h>
#include <stack>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
namespace DB
{
static void checkProcessorHasSingleOutput(IProcessor * processor)
{
/// SourceFromInputStream may have totals port. Skip this check.
if (typeid_cast<const SourceFromInputStream *>(processor))
return;
size_t num_outputs = processor->getOutputs().size();
if (num_outputs != 1)
throw Exception("All processors in TreeExecutorBlockInputStream must have single output, "
@ -17,7 +25,7 @@ static void checkProcessorHasSingleOutput(IProcessor * processor)
/// Check tree invariants (described in TreeExecutor.h).
/// Collect sources with progress.
static void validateTree(const Processors & processors, IProcessor * root, std::vector<ISourceWithProgress *> & sources)
static void validateTree(const Processors & processors, IProcessor * root, IProcessor * totals_root, std::vector<ISourceWithProgress *> & sources)
{
std::unordered_map<IProcessor *, size_t> index;
@ -34,6 +42,8 @@ static void validateTree(const Processors & processors, IProcessor * root, std::
std::stack<IProcessor *> stack;
stack.push(root);
if (totals_root)
stack.push(totals_root);
while (!stack.empty())
{
@ -50,8 +60,15 @@ static void validateTree(const Processors & processors, IProcessor * root, std::
size_t position = it->second;
if (is_visited[position])
throw Exception("Processor with name " + node->getName() + " was visited twice while traverse in TreeExecutorBlockInputStream. "
{
/// SourceFromInputStream may have totals port. Skip this check.
if (typeid_cast<const SourceFromInputStream *>(node))
continue;
throw Exception("Processor with name " + node->getName() +
" was visited twice while traverse in TreeExecutorBlockInputStream. "
"Passed processors are not tree.", ErrorCodes::LOGICAL_ERROR);
}
is_visited[position] = true;
@ -81,18 +98,33 @@ void TreeExecutorBlockInputStream::init()
throw Exception("No processors were passed to TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
root = &output_port.getProcessor();
IProcessor * totals_root = nullptr;
validateTree(processors, root, sources_with_progress);
if (totals_port)
totals_root = &totals_port->getProcessor();
validateTree(processors, root, totals_root, sources_with_progress);
input_port = std::make_unique<InputPort>(getHeader(), root);
connect(output_port, *input_port);
input_port->setNeeded();
if (totals_port)
{
input_totals_port = std::make_unique<InputPort>(totals_port->getHeader(), root);
connect(*totals_port, *input_totals_port);
input_totals_port->setNeeded();
}
}
void TreeExecutorBlockInputStream::execute()
void TreeExecutorBlockInputStream::execute(bool on_totals)
{
std::stack<IProcessor *> stack;
stack.push(root);
if (on_totals)
stack.push(&totals_port->getProcessor());
else
stack.push(root);
auto prepare_processor = [](IProcessor * processor)
{
@ -141,10 +173,6 @@ void TreeExecutorBlockInputStream::execute()
break;
}
case IProcessor::Status::PortFull:
{
stack.pop();
break;
}
case IProcessor::Status::Finished:
{
stack.pop();
@ -173,17 +201,94 @@ void TreeExecutorBlockInputStream::execute()
}
}
void TreeExecutorBlockInputStream::calcRowsBeforeLimit()
{
std::stack<IProcessor *> stack;
stack.push(root);
size_t rows_before_limit = 0;
bool has_limit = false;
while (!stack.empty())
{
auto processor = stack.top();
stack.pop();
if (auto * limit = typeid_cast<const LimitTransform *>(processor))
{
has_limit = true;
rows_before_limit += limit->getRowsBeforeLimitAtLeast();
}
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
{
if (auto & stream = source->getStream())
{
auto & profile_info = stream->getProfileInfo();
if (profile_info.hasAppliedLimit())
{
has_limit = true;
rows_before_limit += profile_info.getRowsBeforeLimit();
}
}
}
if (auto * sorting = typeid_cast<const PartialSortingTransform *>(processor))
{
rows_before_limit += sorting->getNumReadRows();
has_limit = true;
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
continue;
}
for (auto & child_port : processor->getInputs())
{
auto * child_processor = &child_port.getOutputPort().getProcessor();
stack.push(child_processor);
}
}
if (has_limit)
info.setRowsBeforeLimit(rows_before_limit);
}
Block TreeExecutorBlockInputStream::readImpl()
{
while (true)
{
if (input_port->isFinished())
{
if (totals_port && !input_totals_port->isFinished())
{
execute(true);
if (input_totals_port->hasData())
totals = getHeader().cloneWithColumns(input_totals_port->pull().detachColumns());
}
calcRowsBeforeLimit();
return {};
}
if (input_port->hasData())
return getHeader().cloneWithColumns(input_port->pull().detachColumns());
{
auto chunk = input_port->pull();
Block block = getHeader().cloneWithColumns(chunk.detachColumns());
execute();
if (auto & chunk_info = chunk.getChunkInfo())
{
if (auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(chunk_info.get()))
{
block.info.bucket_num = agg_info->bucket_num;
block.info.is_overflows = agg_info->is_overflows;
}
}
return block;
}
execute(false);
}
}

View File

@ -29,11 +29,12 @@ public:
for (auto & context : pipe.getContexts())
interpreter_context.emplace_back(context);
totals_port = pipe.getTotalsPort();
processors = std::move(pipe).detachProcessors();
init();
}
String getName() const override { return root->getName(); }
String getName() const override { return "TreeExecutor"; }
Block getHeader() const override { return root->getOutputs().front().getHeader(); }
/// This methods does not affect TreeExecutor as IBlockInputStream itself.
@ -49,9 +50,11 @@ protected:
private:
OutputPort & output_port;
OutputPort * totals_port = nullptr;
Processors processors;
IProcessor * root = nullptr;
std::unique_ptr<InputPort> input_port;
std::unique_ptr<InputPort> input_totals_port;
/// Remember sources that support progress.
std::vector<ISourceWithProgress *> sources_with_progress;
@ -60,7 +63,9 @@ private:
void init();
/// Execute tree step-by-step until root returns next chunk or execution is finished.
void execute();
void execute(bool on_totals);
void calcRowsBeforeLimit();
/// Moved from pipe.
std::vector<std::shared_ptr<Context>> interpreter_context;

View File

@ -15,7 +15,10 @@ ISink::Status ISink::prepare()
return Status::Ready;
if (input.isFinished())
{
onFinish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())

View File

@ -15,6 +15,8 @@ protected:
virtual void consume(Chunk block) = 0;
virtual void onFinish() {}
public:
explicit ISink(Block header);

View File

@ -597,11 +597,14 @@ void QueryPipeline::calcRowsBeforeLimit()
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
{
auto & info = source->getStream().getProfileInfo();
if (info.hasAppliedLimit())
if (auto & stream = source->getStream())
{
has_limit = visited_limit = true;
rows_before_limit_at_least += info.getRowsBeforeLimit();
auto & info = stream->getProfileInfo();
if (info.hasAppliedLimit())
{
has_limit = visited_limit = true;
rows_before_limit_at_least += info.getRowsBeforeLimit();
}
}
}
}
@ -652,6 +655,9 @@ Pipe QueryPipeline::getPipe() &&
for (auto & storage : storage_holders)
pipe.addStorageHolder(storage);
if (totals_having_port)
pipe.setTotalsPort(totals_having_port);
return pipe;
}

View File

@ -0,0 +1,28 @@
#include <Processors/Sources/SinkToOutputStream.h>
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
SinkToOutputStream::SinkToOutputStream(BlockOutputStreamPtr stream_)
: ISink(stream_->getHeader())
, stream(std::move(stream_))
{
}
void SinkToOutputStream::consume(Chunk chunk)
{
if (!initialized)
stream->writePrefix();
initialized = true;
stream->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void SinkToOutputStream::onFinish()
{
stream->writeSuffix();
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Processors/ISink.h>
namespace DB
{
class IBlockOutputStream;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
/// Sink which writes data to IBlockOutputStream.
/// It's a temporary wrapper.
class SinkToOutputStream : public ISink
{
public:
explicit SinkToOutputStream(BlockOutputStreamPtr stream);
String getName() const override { return "SinkToOutputStream"; }
protected:
void consume(Chunk chunk) override;
void onFinish() override;
private:
BlockOutputStreamPtr stream;
bool initialized = false;
};
}

View File

@ -10,6 +10,11 @@ SourceFromInputStream::SourceFromInputStream(BlockInputStreamPtr stream_, bool f
: ISourceWithProgress(stream_->getHeader())
, force_add_aggregating_info(force_add_aggregating_info_)
, stream(std::move(stream_))
{
init();
}
void SourceFromInputStream::init()
{
auto & sample = getPort().getHeader();
for (auto & type : sample.getDataTypes())

View File

@ -19,7 +19,7 @@ public:
Chunk generate() override;
IBlockInputStream & getStream() { return *stream; }
BlockInputStreamPtr & getStream() { return stream; }
void addTotalsPort();
@ -35,7 +35,7 @@ protected:
private:
bool has_aggregate_functions = false;
bool force_add_aggregating_info;
bool force_add_aggregating_info = false;
BlockInputStreamPtr stream;
Chunk totals;
@ -45,6 +45,8 @@ private:
bool is_generating_finished = false;
bool is_stream_finished = false;
bool is_stream_started = false;
void init();
};
}

View File

@ -14,7 +14,7 @@ MergingSortedTransform::MergingSortedTransform(
UInt64 limit_,
bool quiet_,
bool have_all_inputs_)
: IProcessor(InputPorts(num_inputs, header), {materializeBlock(header)})
: IProcessor(InputPorts(num_inputs, header), {header})
, description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
, have_all_inputs(have_all_inputs_)
, merged_data(header), source_chunks(num_inputs), cursors(num_inputs)

View File

@ -62,7 +62,7 @@ protected:
{
num_rows = limit_rows;
for (auto & column : columns)
column = (*column->cut(0, num_rows)->convertToFullColumnIfConst()).mutate();
column = (*column->cut(0, num_rows)).mutate();
}
total_merged_rows += num_rows;

View File

@ -404,7 +404,7 @@ void IStorage::checkAlterIsPossible(const AlterCommands & commands, const Settin
}
}
BlockInputStreams IStorage::read(
BlockInputStreams IStorage::readStreams(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -412,7 +412,8 @@ BlockInputStreams IStorage::read(
size_t max_block_size,
unsigned num_streams)
{
auto pipes = readWithProcessors(column_names, query_info, context, processed_stage, max_block_size, num_streams);
ForceTreeShapedPipeline enable_tree_shape(query_info);
auto pipes = read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
BlockInputStreams res;
res.reserve(pipes.size());

View File

@ -261,20 +261,8 @@ public:
* if the storage can return a different number of streams.
*
* It is guaranteed that the structure of the table will not change over the lifetime of the returned streams (that is, there will not be ALTER, RENAME and DROP).
*
* Default implementation calls `readWithProcessors` and wraps into TreeExecutor.
*/
virtual BlockInputStreams read(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/);
/** The same as read, but returns processors.
*/
virtual Pipes readWithProcessors(
virtual Pipes read(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
@ -285,7 +273,15 @@ public:
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual bool supportProcessorsPipeline() const { return false; }
/** The same as read, but returns BlockInputStreams.
*/
BlockInputStreams readStreams(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/);
/** Writes the data to a table.
* Receives a description of the query, which can contain information about the data write method.

View File

@ -31,6 +31,7 @@
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <Common/quoteString.h>
#include <Processors/Sources/SourceFromInputStream.h>
namespace DB
@ -117,7 +118,7 @@ StorageKafka::StorageKafka(
}
BlockInputStreams StorageKafka::read(
Pipes StorageKafka::read(
const Names & column_names,
const SelectQueryInfo & /* query_info */,
const Context & context,
@ -126,11 +127,11 @@ BlockInputStreams StorageKafka::read(
unsigned /* num_streams */)
{
if (num_created_consumers == 0)
return BlockInputStreams();
return {};
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
BlockInputStreams streams;
streams.reserve(num_created_consumers);
Pipes pipes;
pipes.reserve(num_created_consumers);
// Claim as many consumers as requested, but don't block
for (size_t i = 0; i < num_created_consumers; ++i)
@ -138,11 +139,12 @@ BlockInputStreams StorageKafka::read(
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
/// TODO: probably that leads to awful performance.
/// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
streams.emplace_back(std::make_shared<KafkaBlockInputStream>(*this, context, column_names, 1));
/// TODO: rewrite KafkaBlockInputStream to KafkaSource. Now it is used in other place.
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, context, column_names, 1)));
}
LOG_DEBUG(log, "Starting reading " << streams.size() << " streams");
return streams;
LOG_DEBUG(log, "Starting reading " << pipes.size() << " streams");
return pipes;
}

View File

@ -36,7 +36,7 @@ public:
void startup() override;
void shutdown() override;
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/IStorage.h>
#include <Processors/Pipe.h>
namespace DB
@ -13,21 +14,21 @@ class StorageBlocks : public IStorage
*/
public:
StorageBlocks(const StorageID & table_id_,
const ColumnsDescription & columns_, BlockInputStreams streams_,
const ColumnsDescription & columns_, Pipes pipes_,
QueryProcessingStage::Enum to_stage_)
: IStorage(table_id_), streams(streams_), to_stage(to_stage_)
: IStorage(table_id_), pipes(std::move(pipes_)), to_stage(to_stage_)
{
setColumns(columns_);
}
static StoragePtr createStorage(const StorageID & table_id,
const ColumnsDescription & columns, BlockInputStreams streams, QueryProcessingStage::Enum to_stage)
const ColumnsDescription & columns, Pipes pipes, QueryProcessingStage::Enum to_stage)
{
return std::make_shared<StorageBlocks>(table_id, columns, streams, to_stage);
return std::make_shared<StorageBlocks>(table_id, columns, std::move(pipes), to_stage);
}
std::string getName() const override { return "Blocks"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & /*context*/) const override { return to_stage; }
BlockInputStreams read(
Pipes read(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
@ -35,12 +36,12 @@ public:
size_t /*max_block_size*/,
unsigned /*num_streams*/) override
{
return streams;
return std::move(pipes);
}
private:
Block res_block;
BlockInputStreams streams;
Pipes pipes;
QueryProcessingStage::Enum to_stage;
};

View File

@ -40,6 +40,7 @@ limitations under the License. */
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Access/AccessFlags.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
namespace DB
@ -123,26 +124,24 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(const Context & conte
return new_mergeable_blocks;
}
BlockInputStreams StorageLiveView::blocksToInputStreams(BlocksPtrs blocks, Block & sample_block)
Pipes StorageLiveView::blocksToPipes(BlocksPtrs blocks, Block & sample_block)
{
BlockInputStreams streams;
Pipes pipes;
for (auto & blocks_ : *blocks)
{
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
streams.push_back(std::move(stream));
}
return streams;
pipes.emplace_back(std::make_shared<BlocksSource>(std::make_shared<BlocksPtr>(blocks_), sample_block));
return pipes;
}
/// Complete query using input streams from mergeable blocks
BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams from)
BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes)
{
auto block_context = std::make_unique<Context>(global_context);
block_context->makeQueryContext();
auto blocks_storage_id = getBlocksStorageID();
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, getParentStorage()->getColumns(),
std::move(from), QueryProcessingStage::WithMergeableState);
std::move(pipes), QueryProcessingStage::WithMergeableState);
block_context->addExternalTable(blocks_storage_id.table_name, blocks_storage);
@ -179,7 +178,7 @@ void StorageLiveView::writeIntoLiveView(
}
bool is_block_processed = false;
BlockInputStreams from;
Pipes from;
MergeableBlocksPtr mergeable_blocks;
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
@ -191,7 +190,7 @@ void StorageLiveView::writeIntoLiveView(
{
mergeable_blocks = live_view.collectMergeableBlocks(context);
live_view.setMergeableBlocks(mergeable_blocks);
from = live_view.blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
from = live_view.blocksToPipes(mergeable_blocks->blocks, mergeable_blocks->sample_block);
is_block_processed = true;
}
}
@ -205,10 +204,11 @@ void StorageLiveView::writeIntoLiveView(
if (live_view.getInnerSubQuery())
mergeable_query = live_view.getInnerSubQuery();
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), block.rows())));
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id,
live_view.getParentStorage()->getColumns(), std::move(streams), QueryProcessingStage::FetchColumns);
live_view.getParentStorage()->getColumns(), std::move(pipes), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage,
QueryProcessingStage::WithMergeableState);
@ -227,11 +227,11 @@ void StorageLiveView::writeIntoLiveView(
mergeable_blocks = live_view.getMergeableBlocks();
mergeable_blocks->blocks->push_back(new_mergeable_blocks);
from = live_view.blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
from = live_view.blocksToPipes(mergeable_blocks->blocks, mergeable_blocks->sample_block);
}
}
BlockInputStreamPtr data = live_view.completeQuery(from);
BlockInputStreamPtr data = live_view.completeQuery(std::move(from));
copyData(*data, *output);
}
@ -334,8 +334,8 @@ bool StorageLiveView::getNewBlocks()
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
mergeable_blocks = collectMergeableBlocks(*live_view_context);
BlockInputStreams from = blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
BlockInputStreamPtr data = completeQuery({from});
Pipes from = blocksToPipes(mergeable_blocks->blocks, mergeable_blocks->sample_block);
BlockInputStreamPtr data = completeQuery(std::move(from));
while (Block block = data->read())
{
@ -523,7 +523,7 @@ void StorageLiveView::refresh(const Context & context)
}
}
BlockInputStreams StorageLiveView::read(
Pipes StorageLiveView::read(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
@ -531,7 +531,7 @@ BlockInputStreams StorageLiveView::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
std::shared_ptr<BlocksBlockInputStream> stream;
Pipes pipes;
{
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
@ -539,9 +539,9 @@ BlockInputStreams StorageLiveView::read(
if (getNewBlocks())
condition.notify_all();
}
stream = std::make_shared<BlocksBlockInputStream>(blocks_ptr, getHeader());
pipes.emplace_back(std::make_shared<BlocksSource>(blocks_ptr, getHeader()));
}
return { stream };
return pipes;
}
BlockInputStreams StorageLiveView::watch(

View File

@ -126,7 +126,7 @@ public:
void refresh(const Context & context);
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -148,7 +148,7 @@ public:
/// Collect mergeable blocks and their sample. Must be called holding mutex
MergeableBlocksPtr collectMergeableBlocks(const Context & context);
/// Complete query using input streams from mergeable blocks
BlockInputStreamPtr completeQuery(BlockInputStreams from);
BlockInputStreamPtr completeQuery(Pipes pipes);
void setMergeableBlocks(MergeableBlocksPtr blocks) { mergeable_blocks = blocks; }
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
@ -159,7 +159,7 @@ public:
Block getHeader() const;
/// convert blocks to input streams
static BlockInputStreams blocksToInputStreams(BlocksPtrs blocks, Block & sample_block);
static Pipes blocksToPipes(BlocksPtrs blocks, Block & sample_block);
static void writeIntoLiveView(
StorageLiveView & live_view,

View File

@ -19,7 +19,7 @@ class StorageFromMergeTreeDataPart : public ext::shared_ptr_helper<StorageFromMe
public:
String getName() const override { return "FromMergeTreeDataPart"; }
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -27,19 +27,12 @@ public:
size_t max_block_size,
unsigned num_streams) override
{
auto pipes = MergeTreeDataSelectExecutor(part->storage).readFromParts(
return MergeTreeDataSelectExecutor(part->storage).readFromParts(
{part}, column_names, query_info, context, max_block_size, num_streams);
/// Wrap processors to BlockInputStreams. It is temporary. Will be changed to processors interface later.
BlockInputStreams streams;
streams.reserve(pipes.size());
for (auto & pipe : pipes)
streams.emplace_back(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipe)));
return streams;
}
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override

View File

@ -80,6 +80,28 @@ struct SelectQueryInfo
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
PreparedSets sets;
/// Temporary flag is needed to support old pipeline with input streams.
/// If enabled, then pipeline returned by storage must be a tree.
/// Processors from the tree can't return ExpandPipeline status.
mutable bool force_tree_shaped_pipeline = false;
};
/// RAII class to enable force_tree_shaped_pipeline for SelectQueryInfo.
/// Looks awful, but I hope it's temporary.
struct ForceTreeShapedPipeline
{
explicit ForceTreeShapedPipeline(const SelectQueryInfo & info_) : info(info_)
{
force_tree_shaped_pipeline = info.force_tree_shaped_pipeline;
info.force_tree_shaped_pipeline = true;
}
~ForceTreeShapedPipeline() { info.force_tree_shaped_pipeline = force_tree_shaped_pipeline; }
private:
bool force_tree_shaped_pipeline;
const SelectQueryInfo & info;
};
}

View File

@ -155,28 +155,8 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
return QueryProcessingStage::FetchColumns;
}
static Pipes readAsPipes(
const StoragePtr & storage,
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
if (storage->supportProcessorsPipeline())
return storage->readWithProcessors(column_names, query_info, context, processed_stage, max_block_size, num_streams);
auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
Pipes pipes;
for (auto & stream : streams)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(stream));
return pipes;
};
Pipes StorageBuffer::readWithProcessors(
Pipes StorageBuffer::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -207,7 +187,7 @@ Pipes StorageBuffer::readWithProcessors(
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(destination);
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
pipes_from_dst = readAsPipes(destination, column_names, query_info, context, processed_stage, max_block_size, num_streams);
pipes_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
}
else
{
@ -242,7 +222,7 @@ Pipes StorageBuffer::readWithProcessors(
}
else
{
pipes_from_dst = readAsPipes(destination, columns_intersection, query_info, context, processed_stage, max_block_size, num_streams);
pipes_from_dst = destination->read(columns_intersection, query_info, context, processed_stage, max_block_size, num_streams);
for (auto & pipe : pipes_from_dst)
{
pipe.addSimpleTransform(std::make_shared<AddingMissedTransform>(

View File

@ -56,7 +56,7 @@ public:
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
Pipes readWithProcessors(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -64,8 +64,6 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void startup() override;

View File

@ -11,6 +11,8 @@
#include <Parsers/ASTLiteral.h>
#include <common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
namespace DB
@ -43,7 +45,7 @@ StorageDictionary::StorageDictionary(
}
}
BlockInputStreams StorageDictionary::read(
Pipes StorageDictionary::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -52,7 +54,12 @@ BlockInputStreams StorageDictionary::read(
const unsigned /*threads*/)
{
auto dictionary = context.getExternalDictionariesLoader().getDictionary(dictionary_name);
return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)};
auto stream = dictionary->getBlockInputStream(column_names, max_block_size);
auto source = std::make_shared<SourceFromInputStream>(stream);
/// TODO: update dictionary interface for processors.
Pipes pipes;
pipes.emplace_back(std::move(source));
return pipes;
}
NamesAndTypesList StorageDictionary::getNamesAndTypes(const DictionaryStructure & dictionary_structure)

View File

@ -25,12 +25,12 @@ class StorageDictionary : public ext::shared_ptr_helper<StorageDictionary>, publ
public:
std::string getName() const override { return "Dictionary"; }
BlockInputStreams read(const Names & column_names,
Pipes read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
size_t max_block_size,
unsigned threads) override;
static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure);

View File

@ -340,7 +340,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
: QueryProcessingStage::WithMergeableState;
}
BlockInputStreams StorageDistributed::read(
Pipes StorageDistributed::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -413,7 +413,7 @@ BlockInputStreams StorageDistributed::read(
}
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings);
select_stream_factory, cluster, modified_query_ast, context, settings, query_info);
}

View File

@ -69,7 +69,7 @@ public:
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ClusterPtr & cluster) const;
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -33,6 +33,8 @@
#include <re2/re2.h>
#include <filesystem>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace fs = std::filesystem;
@ -200,16 +202,47 @@ StorageFile::StorageFile(CommonArguments args)
setConstraints(args.constraints);
}
class StorageFileBlockInputStream : public IBlockInputStream
class StorageFileSource : public SourceWithProgress
{
public:
StorageFileBlockInputStream(std::shared_ptr<StorageFile> storage_,
const Context & context_, UInt64 max_block_size_,
std::string file_path_, bool need_path, bool need_file,
const CompressionMethod compression_method_,
BlockInputStreamPtr prepared_reader = nullptr)
: storage(std::move(storage_)), reader(std::move(prepared_reader)),
context(context_), max_block_size(max_block_size_), compression_method(compression_method_)
struct FilesInfo
{
std::vector<std::string> files;
std::atomic<size_t> next_file_to_read = 0;
bool need_path_column = false;
bool need_file_column = false;
};
using FilesInfoPtr = std::shared_ptr<FilesInfo>;
static Block getHeader(StorageFile & storage, bool need_path_column, bool need_file_column)
{
auto header = storage.getSampleBlock();
/// Note: AddingDefaultsBlockInputStream doesn't change header.
if (need_path_column)
header.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (need_file_column)
header.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
return header;
}
StorageFileSource(
std::shared_ptr<StorageFile> storage_,
const Context & context_,
UInt64 max_block_size_,
FilesInfoPtr files_info_,
ColumnDefaults column_defaults_)
: SourceWithProgress(getHeader(*storage_, files_info_->need_path_column, files_info_->need_file_column))
, storage(std::move(storage_))
, files_info(std::move(files_info_))
, column_defaults(std::move(column_defaults_))
, context(context_)
, max_block_size(max_block_size_)
{
if (storage->use_table_fd)
{
@ -233,9 +266,6 @@ public:
else
{
shared_lock = std::shared_lock(storage->rwlock);
file_path = std::make_optional(file_path_);
with_file_column = need_file;
with_path_column = need_path;
}
}
@ -244,82 +274,111 @@ public:
return storage->getName();
}
Block readImpl() override
Chunk generate() override
{
/// Open file lazily on first read. This is needed to avoid too many open files from different streams.
if (!reader)
while (!finished_generate)
{
read_buf = wrapReadBufferWithCompressionMethod(storage->use_table_fd
? std::make_unique<ReadBufferFromFileDescriptor>(storage->table_fd)
: std::make_unique<ReadBufferFromFile>(file_path.value()),
compression_method);
reader = FormatFactory::instance().getInput(storage->format_name, *read_buf, storage->getSampleBlock(), context, max_block_size);
reader->readPrefix();
}
auto res = reader->read();
/// Enrich with virtual columns.
if (res && file_path)
{
if (with_path_column)
res.insert({DataTypeString().createColumnConst(res.rows(), file_path.value())->convertToFullColumnIfConst(),
std::make_shared<DataTypeString>(), "_path"}); /// construction with const is for probably generating less code
if (with_file_column)
/// Open file lazily on first read. This is needed to avoid too many open files from different streams.
if (!reader)
{
size_t last_slash_pos = file_path.value().find_last_of('/');
res.insert({DataTypeString().createColumnConst(res.rows(), file_path.value().substr(
last_slash_pos + 1))->convertToFullColumnIfConst(),
std::make_shared<DataTypeString>(), "_file"});
}
}
if (!storage->use_table_fd)
{
auto current_file = files_info->next_file_to_read.fetch_add(1);
if (current_file >= files_info->files.size())
return {};
/// Close file prematurally if stream was ended.
if (!res)
{
current_path = files_info->files[current_file];
/// Special case for distributed format. Defaults are not needed here.
if (storage->format_name == "Distributed")
{
reader = StorageDistributedDirectoryMonitor::createStreamFromFile(current_path);
continue;
}
}
std::unique_ptr<ReadBuffer> nested_buffer;
CompressionMethod method;
if (storage->use_table_fd)
{
nested_buffer = std::make_unique<ReadBufferFromFileDescriptor>(storage->table_fd);
method = chooseCompressionMethod("", storage->compression_method);
}
else
{
nested_buffer = std::make_unique<ReadBufferFromFile>(current_path);
method = chooseCompressionMethod(current_path, storage->compression_method);
}
read_buf = wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method);
reader = FormatFactory::instance().getInput(
storage->format_name, *read_buf, storage->getSampleBlock(), context, max_block_size);
if (!column_defaults.empty())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
reader->readPrefix();
}
if (auto res = reader->read())
{
Columns columns = res.getColumns();
UInt64 num_rows = res.rows();
/// Enrich with virtual columns.
if (files_info->need_path_column)
{
auto column = DataTypeString().createColumnConst(num_rows, current_path);
columns.push_back(column->convertToFullColumnIfConst());
}
if (files_info->need_file_column)
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
auto column = DataTypeString().createColumnConst(num_rows, std::move(file_name));
columns.push_back(column->convertToFullColumnIfConst());
}
return Chunk(std::move(columns), num_rows);
}
/// Read only once for file descriptor.
if (storage->use_table_fd)
finished_generate = true;
/// Close file prematurely if stream was ended.
reader->readSuffix();
reader.reset();
read_buf.reset();
}
return res;
}
Block getHeader() const override
{
auto res = storage->getSampleBlock();
if (res && file_path)
{
if (with_path_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (with_file_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
}
return res;
return {};
}
private:
std::shared_ptr<StorageFile> storage;
std::optional<std::string> file_path;
bool with_path_column = false;
bool with_file_column = false;
FilesInfoPtr files_info;
String current_path;
Block sample_block;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
ColumnDefaults column_defaults;
const Context & context; /// TODO Untangle potential issues with context lifetime.
UInt64 max_block_size;
const CompressionMethod compression_method;
bool finished_generate = false;
std::shared_lock<std::shared_mutex> shared_lock;
std::unique_lock<std::shared_mutex> unique_lock;
};
BlockInputStreams StorageFile::read(
Pipes StorageFile::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -337,29 +396,30 @@ BlockInputStreams StorageFile::read(
if (paths.size() == 1 && !Poco::File(paths[0]).exists())
throw Exception("File " + paths[0] + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
blocks_input.reserve(paths.size());
bool need_path_column = false;
bool need_file_column = false;
auto files_info = std::make_shared<StorageFileSource::FilesInfo>();
files_info->files = paths;
for (const auto & column : column_names)
{
if (column == "_path")
need_path_column = true;
files_info->need_path_column = true;
if (column == "_file")
need_file_column = true;
files_info->need_file_column = true;
}
for (const auto & file_path : paths)
{
BlockInputStreamPtr cur_block;
if (format_name == "Distributed")
cur_block = StorageDistributedDirectoryMonitor::createStreamFromFile(file_path);
else
cur_block = std::make_shared<StorageFileBlockInputStream>(
std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path, need_path_column, need_file_column, chooseCompressionMethod(file_path, compression_method));
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared<AddingDefaultsBlockInputStream>(cur_block, column_defaults, context));
}
return narrowBlockInputStreams(blocks_input, num_streams);
if (num_streams > paths.size())
num_streams = paths.size();
Pipes pipes;
pipes.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<StorageFileSource>(this_ptr, context, max_block_size, files_info, column_defaults));
return pipes;
}

View File

@ -24,7 +24,7 @@ class StorageFile : public ext::shared_ptr_helper<StorageFile>, public IStorage
public:
std::string getName() const override { return "File"; }
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -53,7 +53,7 @@ public:
};
protected:
friend class StorageFileBlockInputStream;
friend class StorageFileSource;
friend class StorageFileBlockOutputStream;
/// From file descriptor

View File

@ -25,6 +25,8 @@
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <hdfs/hdfs.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB
{
@ -63,24 +65,48 @@ StorageHDFS::StorageHDFS(const String & uri_,
namespace
{
class HDFSBlockInputStream : public IBlockInputStream
class HDFSSource : public SourceWithProgress
{
public:
HDFSBlockInputStream(const String & uri,
bool need_path,
bool need_file,
const String & format,
const Block & sample_block,
const Context & context,
UInt64 max_block_size,
const CompressionMethod compression_method)
struct SourcesInfo
{
std::vector<String> uris;
std::atomic<size_t> next_uri_to_read = 0;
bool need_path_column = false;
bool need_file_column = false;
};
using SourcesInfoPtr = std::shared_ptr<SourcesInfo>;
static Block getHeader(Block header, bool need_path_column, bool need_file_column)
{
if (need_path_column)
header.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (need_file_column)
header.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
return header;
}
HDFSSource(
SourcesInfoPtr source_info_,
String uri_,
String format_,
String compression_method_,
Block sample_block_,
const Context & context_,
UInt64 max_block_size_)
: SourceWithProgress(getHeader(sample_block_, source_info_->need_path_column, source_info_->need_file_column))
, source_info(std::move(source_info_))
, uri(std::move(uri_))
, format(std::move(format_))
, compression_method(compression_method_)
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
, context(context_)
{
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri), compression_method);
file_path = uri;
with_file_column = need_file;
with_path_column = need_path;
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
}
String getName() const override
@ -88,53 +114,66 @@ public:
return "HDFS";
}
Block readImpl() override
Chunk generate() override
{
auto res = reader->read();
if (res)
while (true)
{
if (with_path_column)
res.insert({DataTypeString().createColumnConst(res.rows(), file_path)->convertToFullColumnIfConst(), std::make_shared<DataTypeString>(),
"_path"}); /// construction with const is for probably generating less code
if (with_file_column)
if (!reader)
{
size_t last_slash_pos = file_path.find_last_of('/');
res.insert({DataTypeString().createColumnConst(res.rows(), file_path.substr(
last_slash_pos + 1))->convertToFullColumnIfConst(), std::make_shared<DataTypeString>(),
"_file"});
auto pos = source_info->next_uri_to_read.fetch_add(1);
if (pos >= source_info->uris.size())
return {};
auto path = source_info->uris[pos];
current_path = uri + path;
auto compression = chooseCompressionMethod(path, compression_method);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path), compression);
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
reader->readPrefix();
}
if (auto res = reader->read())
{
Columns columns = res.getColumns();
UInt64 num_rows = res.rows();
/// Enrich with virtual columns.
if (source_info->need_path_column)
{
auto column = DataTypeString().createColumnConst(num_rows, current_path);
columns.push_back(column->convertToFullColumnIfConst());
}
if (source_info->need_file_column)
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
auto column = DataTypeString().createColumnConst(num_rows, std::move(file_name));
columns.push_back(column->convertToFullColumnIfConst());
}
return Chunk(std::move(columns), num_rows);
}
reader->readSuffix();
}
return res;
}
Block getHeader() const override
{
auto res = reader->getHeader();
if (res)
{
if (with_path_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (with_file_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
}
return res;
}
void readPrefixImpl() override
{
reader->readPrefix();
}
void readSuffixImpl() override
{
reader->readSuffix();
}
private:
BlockInputStreamPtr reader;
String file_path;
bool with_path_column = false;
bool with_file_column = false;
SourcesInfoPtr source_info;
String uri;
String format;
String compression_method;
String current_path;
UInt64 max_block_size;
Block sample_block;
const Context & context;
};
class HDFSBlockOutputStream : public IBlockOutputStream
@ -228,7 +267,7 @@ Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, c
}
BlockInputStreams StorageHDFS::read(
Pipes StorageHDFS::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context_,
@ -243,24 +282,27 @@ BlockInputStreams StorageHDFS::read(
HDFSBuilderPtr builder = createHDFSBuilder(uri_without_path + "/");
HDFSFSPtr fs = createHDFSFS(builder.get());
const Strings res_paths = LSWithRegexpMatching("/", fs, path_from_uri);
BlockInputStreams result;
bool need_path_column = false;
bool need_file_column = false;
auto sources_info = std::make_shared<HDFSSource::SourcesInfo>();
sources_info->uris = LSWithRegexpMatching("/", fs, path_from_uri);
for (const auto & column : column_names)
{
if (column == "_path")
need_path_column = true;
sources_info->need_path_column = true;
if (column == "_file")
need_file_column = true;
}
for (const auto & res_path : res_paths)
{
result.push_back(std::make_shared<HDFSBlockInputStream>(uri_without_path + res_path, need_path_column, need_file_column, format_name, getSampleBlock(), context_,
max_block_size, chooseCompressionMethod(res_path, compression_method)));
sources_info->need_file_column = true;
}
return narrowBlockInputStreams(result, num_streams);
if (num_streams > sources_info->uris.size())
num_streams = sources_info->uris.size();
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<HDFSSource>(
sources_info, uri_without_path, format_name, compression_method, getSampleBlock(), context_, max_block_size));
return pipes;
}
BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const Context & /*context*/)

View File

@ -19,7 +19,7 @@ class StorageHDFS : public ext::shared_ptr_helper<StorageHDFS>, public IStorage
public:
String getName() const override { return "HDFS"; }
BlockInputStreams read(const Names & column_names,
Pipes read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,

View File

@ -5,6 +5,9 @@
#include <DataStreams/IBlockInputStream.h>
#include <memory>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
namespace DB
@ -22,26 +25,28 @@ StorageInput::StorageInput(const String & table_name_, const ColumnsDescription
}
class StorageInputBlockInputStream : public IBlockInputStream
class StorageInputSource : public SourceWithProgress
{
public:
StorageInputBlockInputStream(Context & context_, const Block sample_block_)
: context(context_),
sample_block(sample_block_)
StorageInputSource(Context & context_, Block sample_block)
: SourceWithProgress(std::move(sample_block)), context(context_)
{
}
Block readImpl() override { return context.getInputBlocksReaderCallback()(context); }
void readPrefix() override {}
void readSuffix() override {}
Chunk generate() override
{
auto block = context.getInputBlocksReaderCallback()(context);
if (!block)
return {};
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
String getName() const override { return "Input"; }
Block getHeader() const override { return sample_block; }
private:
Context & context;
const Block sample_block;
};
@ -51,26 +56,29 @@ void StorageInput::setInputStream(BlockInputStreamPtr input_stream_)
}
BlockInputStreams StorageInput::read(const Names & /*column_names*/,
Pipes StorageInput::read(const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
Pipes pipes;
Context & query_context = const_cast<Context &>(context).getQueryContext();
/// It is TCP request if we have callbacks for input().
if (query_context.getInputBlocksReaderCallback())
{
/// Send structure to the client.
query_context.initializeInput(shared_from_this());
input_stream = std::make_shared<StorageInputBlockInputStream>(query_context, getSampleBlock());
pipes.emplace_back(std::make_shared<StorageInputSource>(query_context, getSampleBlock()));
return pipes;
}
if (!input_stream)
throw Exception("Input stream is not initialized, input() must be used only in INSERT SELECT query", ErrorCodes::INVALID_USAGE_OF_INPUT);
return {input_stream};
pipes.emplace_back(std::make_shared<SourceFromInputStream>(input_stream));
return pipes;
}
}

View File

@ -17,7 +17,7 @@ public:
/// A table will read from this stream.
void setInputStream(BlockInputStreamPtr input_stream_);
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -15,6 +15,8 @@
#include <Poco/String.h> /// toLower
#include <Poco/File.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB
@ -236,11 +238,15 @@ size_t rawSize(const StringRef & t)
return t.size;
}
class JoinBlockInputStream : public IBlockInputStream
class JoinSource : public SourceWithProgress
{
public:
JoinBlockInputStream(const Join & parent_, UInt64 max_block_size_, Block && sample_block_)
: parent(parent_), lock(parent.data->rwlock), max_block_size(max_block_size_), sample_block(std::move(sample_block_))
JoinSource(const Join & parent_, UInt64 max_block_size_, Block sample_block_)
: SourceWithProgress(sample_block_)
, parent(parent_)
, lock(parent.data->rwlock)
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
{
columns.resize(sample_block.columns());
column_indices.resize(sample_block.columns());
@ -264,20 +270,17 @@ public:
String getName() const override { return "Join"; }
Block getHeader() const override { return sample_block; }
protected:
Block readImpl() override
Chunk generate() override
{
if (parent.data->blocks.empty())
return Block();
return {};
Block block;
Chunk chunk;
if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps,
[&](auto kind, auto strictness, auto & map) { block = createBlock<kind, strictness>(map); }))
[&](auto kind, auto strictness, auto & map) { chunk = createChunk<kind, strictness>(map); }))
throw Exception("Logical error: unknown JOIN strictness", ErrorCodes::LOGICAL_ERROR);
return block;
return chunk;
}
private:
@ -295,7 +298,7 @@ private:
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
Block createBlock(const Maps & maps)
Chunk createChunk(const Maps & maps)
{
for (size_t i = 0; i < sample_block.columns(); ++i)
{
@ -306,7 +309,7 @@ private:
if (key_pos == i)
{
// unwrap null key column
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*columns[i]);
auto & nullable_col = assert_cast<ColumnNullable &>(*columns[i]);
columns[i] = nullable_col.getNestedColumnPtr()->assumeMutable();
}
else
@ -334,22 +337,25 @@ private:
if (!rows_added)
return {};
Block res = sample_block.cloneEmpty();
Columns res_columns;
res_columns.reserve(columns.size());
for (size_t i = 0; i < columns.size(); ++i)
if (column_with_null[i])
{
if (key_pos == i)
res.getByPosition(i).column = makeNullable(std::move(columns[i]));
res_columns.emplace_back(makeNullable(std::move(columns[i])));
else
{
const ColumnNullable & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]);
res.getByPosition(i).column = nullable_col.getNestedColumnPtr();
const auto & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]);
res_columns.emplace_back(makeNullable(nullable_col.getNestedColumnPtr()));
}
}
else
res.getByPosition(i).column = std::move(columns[i]);
res_columns.emplace_back(std::move(columns[i]));
return res;
UInt64 num_rows = res_columns.at(0)->size();
return Chunk(std::move(res_columns), num_rows);
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map>
@ -439,7 +445,7 @@ private:
// TODO: multiple stream read and index read
BlockInputStreams StorageJoin::read(
Pipes StorageJoin::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
@ -448,7 +454,11 @@ BlockInputStreams StorageJoin::read(
unsigned /*num_streams*/)
{
check(column_names);
return {std::make_shared<JoinBlockInputStream>(*join, max_block_size, getSampleBlockForColumns(column_names))};
Pipes pipes;
pipes.emplace_back(std::make_shared<JoinSource>(*join, max_block_size, getSampleBlockForColumns(column_names)));
return pipes;
}
}

View File

@ -36,7 +36,7 @@ public:
/// Verify that the data structure is suitable for implementing this type of JOIN.
void assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_) const;
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -24,6 +24,9 @@
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>
#include "StorageLogSettings.h"
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
#define DBMS_STORAGE_LOG_MARKS_FILE_NAME "__marks.mrk"
@ -44,13 +47,25 @@ namespace ErrorCodes
}
class LogBlockInputStream final : public IBlockInputStream
class LogSource final : public SourceWithProgress
{
public:
LogBlockInputStream(
static Block getHeader(const NamesAndTypesList & columns)
{
Block res;
for (const auto & name_type : columns)
res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
return Nested::flatten(res);
}
LogSource(
size_t block_size_, const NamesAndTypesList & columns_, StorageLog & storage_,
size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
: block_size(block_size_),
: SourceWithProgress(getHeader(columns_)),
block_size(block_size_),
columns(columns_),
storage(storage_),
mark_number(mark_number_),
@ -61,18 +76,8 @@ public:
String getName() const override { return "Log"; }
Block getHeader() const override
{
Block res;
for (const auto & name_type : columns)
res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
return Nested::flatten(res);
}
protected:
Block readImpl() override;
Chunk generate() override;
private:
size_t block_size;
@ -184,15 +189,15 @@ private:
};
Block LogBlockInputStream::readImpl()
Chunk LogSource::generate()
{
Block res;
if (rows_read == rows_limit)
return res;
return {};
if (storage.disk->isDirectoryEmpty(storage.table_path))
return res;
return {};
/// How many rows to read for the next block.
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
@ -211,7 +216,7 @@ Block LogBlockInputStream::readImpl()
throw;
}
if (column->size())
if (!column->empty())
res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name));
}
@ -227,11 +232,13 @@ Block LogBlockInputStream::readImpl()
streams.clear();
}
return Nested::flatten(res);
res = Nested::flatten(res);
UInt64 num_rows = res.rows();
return Chunk(res.getColumns(), num_rows);
}
void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read)
void LogSource::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read)
{
IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
@ -568,7 +575,7 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
return it->second.marks;
}
BlockInputStreams StorageLog::read(
Pipes StorageLog::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -583,7 +590,7 @@ BlockInputStreams StorageLog::read(
std::shared_lock<std::shared_mutex> lock(rwlock);
BlockInputStreams res;
Pipes pipes;
const Marks & marks = getMarksWithRealRowCount();
size_t marks_size = marks.size();
@ -601,7 +608,7 @@ BlockInputStreams StorageLog::read(
size_t rows_begin = mark_begin ? marks[mark_begin - 1].rows : 0;
size_t rows_end = mark_end ? marks[mark_end - 1].rows : 0;
res.emplace_back(std::make_shared<LogBlockInputStream>(
pipes.emplace_back(std::make_shared<LogSource>(
max_block_size,
all_columns,
*this,
@ -610,7 +617,7 @@ BlockInputStreams StorageLog::read(
max_read_buffer_size));
}
return res;
return pipes;
}
BlockOutputStreamPtr StorageLog::write(

View File

@ -17,14 +17,14 @@ namespace DB
*/
class StorageLog : public ext::shared_ptr_helper<StorageLog>, public IStorage
{
friend class LogBlockInputStream;
friend class LogSource;
friend class LogBlockOutputStream;
friend struct ext::shared_ptr_helper<StorageLog>;
public:
String getName() const override { return "Log"; }
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -20,6 +20,7 @@
#include <Storages/ReadInOrderOptimizer.h>
#include <Common/typeid_cast.h>
#include <Processors/Sources/SourceFromInputStream.h>
namespace DB
@ -179,7 +180,7 @@ QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(cons
return getTargetTable()->getQueryProcessingStage(context);
}
BlockInputStreams StorageMaterializedView::read(
Pipes StorageMaterializedView::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -192,10 +193,12 @@ BlockInputStreams StorageMaterializedView::read(
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
for (auto & stream : streams)
stream->addTableLock(lock);
return streams;
Pipes pipes = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
for (auto & pipe : pipes)
pipe.addTableLock(lock);
return pipes;
}
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context)

View File

@ -66,7 +66,7 @@ public:
ActionLock getActionLock(StorageActionBlockType type) override;
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -6,6 +6,8 @@
#include <Storages/StorageFactory.h>
#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB
@ -17,34 +19,34 @@ namespace ErrorCodes
}
class MemoryBlockInputStream : public IBlockInputStream
class MemorySource : public SourceWithProgress
{
public:
MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_, const StorageMemory & storage_)
: column_names(column_names_), begin(begin_), end(end_), it(begin), storage(storage_) {}
MemorySource(Names column_names_, BlocksList::iterator begin_, BlocksList::iterator end_, const StorageMemory & storage)
: SourceWithProgress(storage.getSampleBlockForColumns(column_names_))
, column_names(std::move(column_names_)), begin(begin_), end(end_), it(begin) {}
String getName() const override { return "Memory"; }
Block getHeader() const override { return storage.getSampleBlockForColumns(column_names); }
protected:
Block readImpl() override
Chunk generate() override
{
if (it == end)
{
return Block();
return {};
}
else
{
Block src = *it;
Block res;
Columns columns;
columns.reserve(column_names.size());
/// Add only required columns to `res`.
for (size_t i = 0, size = column_names.size(); i < size; ++i)
res.insert(src.getByName(column_names[i]));
for (const auto & name : column_names)
columns.emplace_back(src.getByName(name).column);
++it;
return res;
return Chunk(std::move(columns), src.rows());
}
}
private:
@ -52,7 +54,6 @@ private:
BlocksList::iterator begin;
BlocksList::iterator end;
BlocksList::iterator it;
const StorageMemory & storage;
};
@ -82,7 +83,7 @@ StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription col
}
BlockInputStreams StorageMemory::read(
Pipes StorageMemory::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
@ -99,7 +100,7 @@ BlockInputStreams StorageMemory::read(
if (num_streams > size)
num_streams = size;
BlockInputStreams res;
Pipes pipes;
for (size_t stream = 0; stream < num_streams; ++stream)
{
@ -109,10 +110,10 @@ BlockInputStreams StorageMemory::read(
std::advance(begin, stream * size / num_streams);
std::advance(end, (stream + 1) * size / num_streams);
res.push_back(std::make_shared<MemoryBlockInputStream>(column_names, begin, end, *this));
pipes.emplace_back(std::make_shared<MemorySource>(column_names, begin, end, *this));
}
return res;
return pipes;
}

View File

@ -28,7 +28,7 @@ public:
size_t getSize() const { return data.size(); }
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -25,6 +25,11 @@
#include <ext/range.h>
#include <algorithm>
#include <Parsers/queryToString.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Transforms/AddingConstColumnTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
namespace DB
@ -154,7 +159,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
}
BlockInputStreams StorageMerge::read(
Pipes StorageMerge::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -162,7 +167,7 @@ BlockInputStreams StorageMerge::read(
const size_t max_block_size,
unsigned num_streams)
{
BlockInputStreams res;
Pipes res;
bool has_table_virtual_column = false;
Names real_column_names;
@ -193,7 +198,7 @@ BlockInputStreams StorageMerge::read(
if (selected_tables.empty())
/// FIXME: do we support sampling in this case?
return createSourceStreams(
return createSources(
query_info, processed_stage, max_block_size, header, {}, real_column_names, modified_context, 0, has_table_virtual_column);
size_t tables_count = selected_tables.size();
@ -219,58 +224,38 @@ BlockInputStreams StorageMerge::read(
query_info.input_sorting_info = input_sorting_info;
}
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
for (const auto & table : selected_tables)
{
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
size_t current_streams = std::min(current_need_streams, remaining_streams);
remaining_streams -= current_streams;
current_streams = std::max(size_t(1), current_streams);
auto & storage = std::get<0>(*it);
auto & storage = std::get<0>(table);
/// If sampling requested, then check that table supports it.
if (query_info.query->as<ASTSelectQuery>()->sample_size() && !storage->supportsSampling())
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
BlockInputStreams source_streams;
if (current_streams)
{
source_streams = createSourceStreams(
query_info, processed_stage, max_block_size, header, *it, real_column_names, modified_context,
auto source_pipes = createSources(
query_info, processed_stage, max_block_size, header, table, real_column_names, modified_context,
current_streams, has_table_virtual_column);
}
else
{
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(
header, [=, this]() mutable -> BlockInputStreamPtr
{
BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size,
header, *it, real_column_names,
modified_context, current_streams, has_table_virtual_column, true);
if (!streams.empty() && streams.size() != 1)
throw Exception("LogicalError: the lazy stream size must to be one or empty.", ErrorCodes::LOGICAL_ERROR);
return streams.empty() ? std::make_shared<NullBlockInputStream>(header) : streams[0];
}));
}
res.insert(res.end(), source_streams.begin(), source_streams.end());
for (auto & pipe : source_pipes)
res.emplace_back(std::move(pipe));
}
if (res.empty())
return res;
res = narrowBlockInputStreams(res, num_streams);
return res;
return narrowPipes(std::move(res), num_streams);
}
BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams)
Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams)
{
auto & [storage, struct_lock, table_name] = storage_with_lock;
SelectQueryInfo modified_query_info = query_info;
@ -278,12 +263,28 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", table_name);
if (!storage)
return BlockInputStreams{
InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).execute().in};
Pipes pipes;
BlockInputStreams source_streams;
if (!storage)
{
if (query_info.force_tree_shaped_pipeline)
{
/// This flag means that pipeline must be tree-shaped,
/// so we can't enable processors for InterpreterSelectQuery here.
auto stream = InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).execute().in;
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
return pipes;
}
pipes.emplace_back(
InterpreterSelectQuery(modified_query_info.query, modified_context,
std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).executeWithProcessors().getPipe());
return pipes;
}
if (processed_stage <= storage->getQueryProcessingStage(modified_context))
{
@ -291,8 +292,7 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));
source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size,
UInt32(streams_num));
pipes = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size, UInt32(streams_num));
}
else if (processed_stage > storage->getQueryProcessingStage(modified_context))
{
@ -303,41 +303,52 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1;
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)};
BlockInputStreamPtr interpreter_stream = interpreter.execute().in;
if (query_info.force_tree_shaped_pipeline)
{
BlockInputStreamPtr stream = interpreter.execute().in;
Pipe pipe(std::make_shared<SourceFromInputStream>(std::move(stream)));
pipes.emplace_back(std::move(pipe));
}
else
{
Pipe pipe = interpreter.executeWithProcessors().getPipe();
pipes.emplace_back(std::move(pipe));
}
/** Materialization is needed, since from distributed storage the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
source_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(interpreter_stream));
pipes.back().addSimpleTransform(std::make_shared<MaterializingTransform>(pipes.back().getHeader()));
}
if (!source_streams.empty())
if (!pipes.empty())
{
if (concat_streams)
if (concat_streams && pipes.size() > 1)
{
BlockInputStreamPtr stream =
source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0];
auto concat = std::make_shared<ConcatProcessor>(pipes.at(0).getHeader(), pipes.size());
Pipe pipe(std::move(pipes), std::move(concat));
source_streams.resize(1);
source_streams[0] = stream;
pipes = Pipes();
pipes.emplace_back(std::move(pipe));
}
for (BlockInputStreamPtr & source_stream : source_streams)
for (auto & pipe : pipes)
{
if (has_table_virtual_column)
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
source_stream, std::make_shared<DataTypeString>(), table_name, "_table");
pipe.addSimpleTransform(std::make_shared<AddingConstColumnTransform<String>>(
pipe.getHeader(), std::make_shared<DataTypeString>(), table_name, "_table"));
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, modified_context, modified_query_info.query, source_stream, processed_stage);
convertingSourceStream(header, modified_context, modified_query_info.query, pipe, processed_stage);
source_stream->addTableLock(struct_lock);
pipe.addTableLock(struct_lock);
}
}
return source_streams;
return pipes;
}
@ -456,10 +467,10 @@ Block StorageMerge::getQueryHeader(
}
void StorageMerge::convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage)
Pipe & pipe, QueryProcessingStage::Enum processed_stage)
{
Block before_block_header = source_stream->getHeader();
source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
Block before_block_header = pipe.getHeader();
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(before_block_header, header, ConvertingTransform::MatchColumnsMode::Name, context));
auto where_expression = query->as<ASTSelectQuery>()->where();

View File

@ -33,7 +33,7 @@ public:
QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const override;
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -77,14 +77,15 @@ protected:
Block getQueryHeader(const Names & column_names, const SelectQueryInfo & query_info,
const Context & context, QueryProcessingStage::Enum processed_stage);
BlockInputStreams createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams = false);
Pipes createSources(
const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams = false);
void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage);
Pipe & pipe, QueryProcessingStage::Enum processed_stage);
};
}

View File

@ -136,7 +136,7 @@ StorageMergeTree::~StorageMergeTree()
shutdown();
}
Pipes StorageMergeTree::readWithProcessors(
Pipes StorageMergeTree::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -1042,6 +1042,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::MoveDestinationType::VOLUME:
movePartitionToVolume(command.partition, command.move_destination_name, command.part, context);
break;
case PartitionCommand::MoveDestinationType::TABLE:
checkPartitionCanBeDropped(command.partition);
String dest_database = command.to_database.empty() ? context.getCurrentDatabase() : command.to_database;
@ -1231,7 +1232,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage)
throw Exception("Table " + getStorageID().getNameForLogs() + " supports attachPartitionFrom only for MergeTree family of table engines."
throw Exception("Table " + getStorageID().getNameForLogs() + " supports movePartitionToTable only for MergeTree family of table engines."
" Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy())
throw Exception("Destination table " + dest_table_storage->getStorageID().getNameForLogs() +
@ -1246,13 +1247,13 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
MutableDataPartsVector dst_parts;
static const String TMP_PREFIX = "tmp_replace_from_";
static const String TMP_PREFIX = "tmp_move_from_";
for (const DataPartPtr & src_part : src_parts)
{
if (!dest_table_storage->canReplacePartition(src_part))
throw Exception(
"Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
"Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
/// This will generate unique name in scope of current server process.
@ -1263,18 +1264,11 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
dst_parts.emplace_back(dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info));
}
/// ATTACH empty part set
/// empty part set
if (dst_parts.empty())
return;
MergeTreePartInfo drop_range;
drop_range.partition_id = partition_id;
drop_range.min_block = 0;
drop_range.max_block = increment.get(); // there will be a "hole" in block numbers
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
/// Atomically add new parts and remove old ones
/// Move new parts to the destination table. NOTE It doesn't look atomic.
try
{
{

View File

@ -37,7 +37,7 @@ public:
bool supportsIndexForIn() const override { return true; }
Pipes readWithProcessors(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -45,8 +45,6 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;

View File

@ -15,6 +15,8 @@
#include <IO/WriteHelpers.h>
#include <Parsers/ASTLiteral.h>
#include <mysqlxx/Transaction.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
namespace DB
@ -59,7 +61,7 @@ StorageMySQL::StorageMySQL(
}
BlockInputStreams StorageMySQL::read(
Pipes StorageMySQL::read(
const Names & column_names_,
const SelectQueryInfo & query_info_,
const Context & context_,
@ -78,7 +80,12 @@ BlockInputStreams StorageMySQL::read(
sample_block.insert({ column_data.type, column_data.name });
}
return { std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size_) };
Pipes pipes;
/// TODO: rewrite MySQLBlockInputStream
pipes.emplace_back(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size_)));
return pipes;
}

View File

@ -34,7 +34,7 @@ public:
std::string getName() const override { return "MySQL"; }
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -6,6 +6,8 @@
#include <Storages/IStorage.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Pipe.h>
namespace DB
@ -20,7 +22,7 @@ class StorageNull : public ext::shared_ptr_helper<StorageNull>, public IStorage
public:
std::string getName() const override { return "Null"; }
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo &,
const Context & /*context*/,
@ -28,7 +30,9 @@ public:
size_t,
unsigned) override
{
return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
Pipes pipes;
pipes.emplace_back(std::make_shared<NullSource>(getSampleBlockForColumns(column_names)));
return pipes;
}
BlockOutputStreamPtr write(const ASTPtr &, const Context &) override

View File

@ -3042,7 +3042,7 @@ ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMerg
return max_added_blocks;
}
Pipes StorageReplicatedMergeTree::readWithProcessors(
Pipes StorageReplicatedMergeTree::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -3551,9 +3551,11 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::MoveDestinationType::DISK:
movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context);
break;
case PartitionCommand::MoveDestinationType::VOLUME:
movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context);
break;
case PartitionCommand::MoveDestinationType::TABLE:
checkPartitionCanBeDropped(command.partition);
String dest_database = command.to_database.empty() ? query_context.getCurrentDatabase() : command.to_database;
@ -5118,7 +5120,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table);
if (!dest_table_storage)
throw Exception("Table " + getStorageID().getNameForLogs() + " supports attachPartitionFrom only for ReplicatedMergeTree family of table engines."
throw Exception("Table " + getStorageID().getNameForLogs() + " supports movePartitionToTable only for ReplicatedMergeTree family of table engines."
" Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy())
throw Exception("Destination table " + dest_table_storage->getStorageID().getNameForLogs() +
@ -5140,9 +5142,11 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts");
static const String TMP_PREFIX = "tmp_replace_from_";
static const String TMP_PREFIX = "tmp_move_from_";
auto zookeeper = getZooKeeper();
/// A range for log entry to remove parts from the source table (myself).
MergeTreePartInfo drop_range;
drop_range.partition_id = partition_id;
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
@ -5157,13 +5161,15 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
queue.disableMergesInBlockRange(drop_range_fake_part_name);
}
/// Clone parts into destination table.
for (size_t i = 0; i < src_all_parts.size(); ++i)
{
auto & src_part = src_all_parts[i];
if (!dest_table_storage->canReplacePartition(src_part))
throw Exception(
"Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
"Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();

View File

@ -88,7 +88,7 @@ public:
bool supportsReplication() const override { return true; }
bool supportsDeduplication() const override { return true; }
Pipes readWithProcessors(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -96,8 +96,6 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;

View File

@ -18,7 +18,6 @@
#include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/narrowBlockInputStreams.h>
@ -30,6 +29,9 @@
#include <Common/parseGlobs.h>
#include <re2/re2.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB
{
@ -43,28 +45,44 @@ namespace ErrorCodes
namespace
{
class StorageS3BlockInputStream : public IBlockInputStream
class StorageS3Source : public SourceWithProgress
{
public:
StorageS3BlockInputStream(
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column)
{
if (with_path_column)
sample_block.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (with_file_column)
sample_block.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
return sample_block;
}
StorageS3Source(
bool need_path,
bool need_file,
const String & format,
const String & name_,
String name_,
const Block & sample_block,
const Context & context,
const ColumnDefaults & column_defaults,
UInt64 max_block_size,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
const String & bucket,
const String & key)
: name(name_)
: SourceWithProgress(getHeader(sample_block, need_path, need_file))
, name(std::move(name_))
, with_file_column(need_file)
, with_path_column(need_path)
, file_path(bucket + "/" + key)
{
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
if (!column_defaults.empty())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
}
String getName() const override
@ -72,52 +90,45 @@ namespace
return name;
}
Block readImpl() override
Chunk generate() override
{
auto res = reader->read();
if (res)
if (!reader)
return {};
if (!initialized)
{
reader->readSuffix();
initialized = true;
}
if (auto block = reader->read())
{
auto columns = block.getColumns();
UInt64 num_rows = block.rows();
if (with_path_column)
res.insert({DataTypeString().createColumnConst(res.rows(), file_path)->convertToFullColumnIfConst(), std::make_shared<DataTypeString>(),
"_path"}); /// construction with const is for probably generating less code
columns.push_back(DataTypeString().createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
if (with_file_column)
{
size_t last_slash_pos = file_path.find_last_of('/');
res.insert({DataTypeString().createColumnConst(res.rows(), file_path.substr(
last_slash_pos + 1))->convertToFullColumnIfConst(), std::make_shared<DataTypeString>(),
"_file"});
columns.push_back(DataTypeString().createColumnConst(num_rows, file_path.substr(
last_slash_pos + 1))->convertToFullColumnIfConst());
}
return Chunk(std::move(columns), num_rows);
}
return res;
}
Block getHeader() const override
{
auto res = reader->getHeader();
if (res)
{
if (with_path_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (with_file_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
}
return res;
}
void readPrefixImpl() override
{
reader->readPrefix();
}
void readSuffixImpl() override
{
reader->readSuffix();
reader.reset();
return {};
}
private:
String name;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
bool initialized = false;
bool with_file_column = false;
bool with_path_column = false;
String file_path;
@ -255,7 +266,7 @@ Strings listFilesWithRegexpMatching(Aws::S3::S3Client & client, const S3::URI &
}
BlockInputStreams StorageS3::read(
Pipes StorageS3::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -263,7 +274,7 @@ BlockInputStreams StorageS3::read(
size_t max_block_size,
unsigned num_streams)
{
BlockInputStreams result;
Pipes pipes;
bool need_path_column = false;
bool need_file_column = false;
for (const auto & column : column_names)
@ -275,28 +286,21 @@ BlockInputStreams StorageS3::read(
}
for (const String & key : listFilesWithRegexpMatching(*client, uri))
{
BlockInputStreamPtr block_input = std::make_shared<StorageS3BlockInputStream>(
pipes.emplace_back(std::make_shared<StorageS3Source>(
need_path_column,
need_file_column,
format_name,
getName(),
getHeaderBlock(column_names),
context,
getColumns().getDefaults(),
max_block_size,
chooseCompressionMethod(uri.endpoint, compression_method),
client,
uri.bucket,
key);
key));
auto column_defaults = getColumns().getDefaults();
if (column_defaults.empty())
result.emplace_back(std::move(block_input));
else
result.emplace_back(std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context));
}
return narrowBlockInputStreams(result, num_streams);
return narrowPipes(std::move(pipes), num_streams);
}
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & /*context*/)

View File

@ -46,7 +46,7 @@ public:
return getSampleBlock();
}
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -31,6 +31,9 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageStripeLog.h>
#include "StorageLogSettings.h"
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Pipe.h>
namespace DB
@ -48,35 +51,46 @@ namespace ErrorCodes
}
class StripeLogBlockInputStream final : public IBlockInputStream
class StripeLogSource final : public SourceWithProgress
{
public:
StripeLogBlockInputStream(StorageStripeLog & storage_, size_t max_read_buffer_size_,
static Block getHeader(
StorageStripeLog & storage,
const Names & column_names,
IndexForNativeFormat::Blocks::const_iterator index_begin,
IndexForNativeFormat::Blocks::const_iterator index_end)
{
if (index_begin == index_end)
return storage.getSampleBlockForColumns(column_names);
/// TODO: check if possible to always return storage.getSampleBlock()
Block header;
for (const auto & column : index_begin->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
header.insert(ColumnWithTypeAndName{ type, column.name });
}
return header;
}
StripeLogSource(StorageStripeLog & storage_, const Names & column_names, size_t max_read_buffer_size_,
std::shared_ptr<const IndexForNativeFormat> & index_,
IndexForNativeFormat::Blocks::const_iterator index_begin_,
IndexForNativeFormat::Blocks::const_iterator index_end_)
: storage(storage_), max_read_buffer_size(max_read_buffer_size_),
index(index_), index_begin(index_begin_), index_end(index_end_)
: SourceWithProgress(getHeader(storage_, column_names, index_begin_, index_end_))
, storage(storage_), max_read_buffer_size(max_read_buffer_size_)
, index(index_), index_begin(index_begin_), index_end(index_end_)
{
if (index_begin != index_end)
{
for (const auto & column : index_begin->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
header.insert(ColumnWithTypeAndName{ type, column.name });
}
}
}
String getName() const override { return "StripeLog"; }
Block getHeader() const override
{
return header;
}
protected:
Block readImpl() override
Chunk generate() override
{
Block res;
start();
@ -94,7 +108,7 @@ protected:
}
}
return res;
return Chunk(res.getColumns(), res.rows());
}
private:
@ -238,7 +252,7 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Strin
}
BlockInputStreams StorageStripeLog::read(
Pipes StorageStripeLog::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -252,15 +266,18 @@ BlockInputStreams StorageStripeLog::read(
NameSet column_names_set(column_names.begin(), column_names.end());
Pipes pipes;
String index_file = table_path + "index.mrk";
if (!disk->exists(index_file))
return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
{
pipes.emplace_back(std::make_shared<NullSource>(getSampleBlockForColumns(column_names)));
return pipes;
}
CompressedReadBufferFromFile index_in(disk->readFile(index_file, INDEX_BUFFER_SIZE));
std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};
BlockInputStreams res;
size_t size = index->blocks.size();
if (num_streams > size)
num_streams = size;
@ -273,13 +290,13 @@ BlockInputStreams StorageStripeLog::read(
std::advance(begin, stream * size / num_streams);
std::advance(end, (stream + 1) * size / num_streams);
res.emplace_back(std::make_shared<StripeLogBlockInputStream>(
*this, context.getSettingsRef().max_read_buffer_size, index, begin, end));
pipes.emplace_back(std::make_shared<StripeLogSource>(
*this, column_names, context.getSettingsRef().max_read_buffer_size, index, begin, end));
}
/// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change.
return res;
return pipes;
}

View File

@ -18,14 +18,14 @@ namespace DB
*/
class StorageStripeLog : public ext::shared_ptr_helper<StorageStripeLog>, public IStorage
{
friend class StripeLogBlockInputStream;
friend class StripeLogSource;
friend class StripeLogBlockOutputStream;
friend struct ext::shared_ptr_helper<StorageStripeLog>;
public:
String getName() const override { return "StripeLog"; }
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -34,6 +34,9 @@
#include <Storages/StorageTinyLog.h>
#include "StorageLogSettings.h"
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
@ -52,17 +55,11 @@ namespace ErrorCodes
}
class TinyLogBlockInputStream final : public IBlockInputStream
class TinyLogSource final : public SourceWithProgress
{
public:
TinyLogBlockInputStream(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_)
: block_size(block_size_), columns(columns_),
storage(storage_), lock(storage_.rwlock),
max_read_buffer_size(max_read_buffer_size_) {}
String getName() const override { return "TinyLog"; }
Block getHeader() const override
static Block getHeader(const NamesAndTypesList & columns)
{
Block res;
@ -72,14 +69,22 @@ public:
return Nested::flatten(res);
}
TinyLogSource(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_)
: SourceWithProgress(getHeader(columns_))
, block_size(block_size_), columns(columns_), storage(storage_), lock(storage_.rwlock)
, max_read_buffer_size(max_read_buffer_size_) {}
String getName() const override { return "TinyLog"; }
protected:
Block readImpl() override;
Chunk generate() override;
private:
size_t block_size;
NamesAndTypesList columns;
StorageTinyLog & storage;
std::shared_lock<std::shared_mutex> lock;
bool finished = false;
bool is_finished = false;
size_t max_read_buffer_size;
struct Stream
@ -167,24 +172,24 @@ private:
};
Block TinyLogBlockInputStream::readImpl()
Chunk TinyLogSource::generate()
{
Block res;
if (finished || (!streams.empty() && streams.begin()->second->compressed.eof()))
if (is_finished || (!streams.empty() && streams.begin()->second->compressed.eof()))
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
* buffers don't waste memory.
*/
finished = true;
is_finished = true;
streams.clear();
return res;
return {};
}
/// if there are no files in the folder, it means that the table is empty
if (storage.disk->isDirectoryEmpty(storage.table_path))
return res;
return {};
for (const auto & name_type : columns)
{
@ -200,21 +205,22 @@ Block TinyLogBlockInputStream::readImpl()
throw;
}
if (column->size())
if (!column->empty())
res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name));
}
if (!res || streams.begin()->second->compressed.eof())
{
finished = true;
is_finished = true;
streams.clear();
}
return Nested::flatten(res);
auto flatten = Nested::flatten(res);
return Chunk(flatten.getColumns(), flatten.rows());
}
void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit)
void TinyLogSource::readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit)
{
IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
settings.getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
@ -388,7 +394,7 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const String
}
BlockInputStreams StorageTinyLog::read(
Pipes StorageTinyLog::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -397,10 +403,15 @@ BlockInputStreams StorageTinyLog::read(
const unsigned /*num_streams*/)
{
check(column_names);
Pipes pipes;
// When reading, we lock the entire storage, because we only have one file
// per column and can't modify it concurrently.
return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>(
pipes.emplace_back(std::make_shared<TinyLogSource>(
max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
return pipes;
}

View File

@ -17,14 +17,14 @@ namespace DB
*/
class StorageTinyLog : public ext::shared_ptr_helper<StorageTinyLog>, public IStorage
{
friend class TinyLogBlockInputStream;
friend class TinyLogSource;
friend class TinyLogBlockOutputStream;
friend struct ext::shared_ptr_helper<StorageTinyLog>;
public:
String getName() const override { return "TinyLog"; }
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -17,6 +17,8 @@
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Poco/Net/HTTPRequest.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB
@ -48,26 +50,27 @@ IStorageURLBase::IStorageURLBase(
namespace
{
class StorageURLBlockInputStream : public IBlockInputStream
class StorageURLSource : public SourceWithProgress
{
public:
StorageURLBlockInputStream(const Poco::URI & uri,
StorageURLSource(const Poco::URI & uri,
const std::string & method,
std::function<void(std::ostream &)> callback,
const String & format,
const String & name_,
String name_,
const Block & sample_block,
const Context & context,
const ColumnDefaults & column_defaults,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: name(name_)
: SourceWithProgress(sample_block), name(std::move(name_))
{
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
uri,
method,
callback,
std::move(callback),
timeouts,
context.getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
@ -77,6 +80,7 @@ namespace
compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
}
String getName() const override
@ -84,30 +88,30 @@ namespace
return name;
}
Block readImpl() override
Chunk generate() override
{
return reader->read();
}
if (!reader)
return {};
Block getHeader() const override
{
return reader->getHeader();
}
if (!initialized)
reader->readPrefix();
void readPrefixImpl() override
{
reader->readPrefix();
}
initialized = true;
if (auto block = reader->read())
return Chunk(block.getColumns(), block.rows());
void readSuffixImpl() override
{
reader->readSuffix();
reader.reset();
return {};
}
private:
String name;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
bool initialized = false;
};
class StorageURLBlockOutputStream : public IBlockOutputStream
@ -181,7 +185,7 @@ std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(con
}
BlockInputStreams IStorageURLBase::read(const Names & column_names,
Pipes IStorageURLBase::read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
@ -193,21 +197,20 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
BlockInputStreamPtr block_input = std::make_shared<StorageURLBlockInputStream>(request_uri,
Pipes pipes;
pipes.emplace_back(std::make_shared<StorageURLSource>(request_uri,
getReadMethod(),
getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size),
format_name,
getName(),
getHeaderBlock(column_names),
context,
getColumns().getDefaults(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(request_uri.getPath(), compression_method));
chooseCompressionMethod(request_uri.getPath(), compression_method)));
auto column_defaults = getColumns().getDefaults();
if (column_defaults.empty())
return {block_input};
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)};
return pipes;
}
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Context & /*context*/)

View File

@ -16,7 +16,7 @@ namespace DB
class IStorageURLBase : public IStorage
{
public:
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -2,6 +2,8 @@
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageValues.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Pipe.h>
namespace DB
@ -13,7 +15,7 @@ StorageValues::StorageValues(const StorageID & table_id_, const ColumnsDescripti
setColumns(columns_);
}
BlockInputStreams StorageValues::read(
Pipes StorageValues::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
@ -23,7 +25,12 @@ BlockInputStreams StorageValues::read(
{
check(column_names, true);
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res_block));
Pipes pipes;
Chunk chunk(res_block.getColumns(), res_block.rows());
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(res_block.cloneEmpty(), std::move(chunk)));
return pipes;
}
}

View File

@ -15,7 +15,7 @@ class StorageValues : public ext::shared_ptr_helper<StorageValues>, public IStor
public:
std::string getName() const override { return "Values"; }
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -16,6 +16,10 @@
#include <Common/typeid_cast.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/MaterializingTransform.h>
namespace DB
{
@ -43,7 +47,7 @@ StorageView::StorageView(
}
BlockInputStreams StorageView::read(
Pipes StorageView::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -51,7 +55,7 @@ BlockInputStreams StorageView::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
BlockInputStreams res;
Pipes pipes;
ASTPtr current_inner_query = inner_query;
@ -83,15 +87,24 @@ BlockInputStreams StorageView::read(
}
QueryPipeline pipeline;
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names);
/// FIXME res may implicitly use some objects owned be pipeline, but them will be destructed after return
res = InterpreterSelectWithUnionQuery(current_inner_query, context, {}, column_names).executeWithMultipleStreams(pipeline);
if (query_info.force_tree_shaped_pipeline)
{
BlockInputStreams streams = interpreter.executeWithMultipleStreams(pipeline);
for (auto & stream : streams)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
}
else
/// TODO: support multiple streams here. Need more general interface than pipes.
pipes.emplace_back(interpreter.executeWithProcessors().getPipe());
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
for (auto & stream : res)
stream = std::make_shared<MaterializingBlockInputStream>(stream);
for (auto & pipe : pipes)
pipe.addSimpleTransform(std::make_shared<MaterializingTransform>(pipe.getHeader()));
return res;
return pipes;
}
void StorageView::replaceTableNameWithSubquery(ASTSelectQuery * select_query, ASTPtr & subquery)

Some files were not shown because too many files have changed in this diff Show More