Merge branch 'master' into CLICKHOUSE-3606

This commit is contained in:
alexey-milovidov 2018-02-22 02:13:38 +03:00 committed by GitHub
commit b6f3f06ef3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
180 changed files with 4567 additions and 1329 deletions

1
.gitignore vendored
View File

@ -12,6 +12,7 @@
build
/docs/en_single_page/
/docs/ru_single_page/
/docs/venv/
# callgrind files
callgrind.out.*

View File

@ -1,6 +1,6 @@
# This strings autochanged from release_lib.sh:
set(VERSION_DESCRIBE v1.1.54354-testing)
set(VERSION_REVISION 54354)
set(VERSION_DESCRIBE v1.1.54355-testing)
set(VERSION_REVISION 54355)
# end of autochange
set (VERSION_MAJOR 1)

View File

@ -393,7 +393,7 @@ void Connection::sendData(const Block & block, const String & name)
else
maybe_compressed_out = out;
block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision);
block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty());
}
writeVarUInt(Protocol::Client::Data, *out);

View File

@ -3,6 +3,7 @@
#include <sys/utsname.h>
#include <cerrno>
#include <cstring>
#include <algorithm>
#include <iostream>
#include <functional>
#include <Poco/DOM/Text.h>
@ -356,7 +357,7 @@ void ConfigProcessor::doIncludesRecursive(
ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string & config_path)
{
Files res;
Files files;
Poco::Path merge_dir_path(config_path);
merge_dir_path.setExtension("d");
@ -378,12 +379,14 @@ ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string &
Poco::File & file = *it;
if (file.isFile() && (endsWith(file.path(), ".xml") || endsWith(file.path(), ".conf")))
{
res.push_back(file.path());
files.push_back(file.path());
}
}
}
return res;
std::sort(files.begin(), files.end());
return files;
}
XMLDocumentPtr ConfigProcessor::processConfig(

View File

@ -2,6 +2,7 @@
#include <string>
#include <unordered_set>
#include <vector>
#include <Poco/DOM/Document.h>
#include <Poco/DOM/DOMParser.h>
@ -87,7 +88,7 @@ public:
void savePreprocessedConfig(const LoadedConfig & loaded_config);
public:
using Files = std::list<std::string>;
using Files = std::vector<std::string>;
static Files getConfigMergeFiles(const std::string & config_path);

View File

@ -6,6 +6,8 @@
#include <IO/Operators.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnConst.h>
#include <iterator>
#include <memory>
@ -18,6 +20,7 @@ namespace ErrorCodes
extern const int POSITION_OUT_OF_BOUND;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
@ -276,13 +279,7 @@ std::string Block::dumpStructure() const
{
if (it != data.begin())
out << ", ";
out << it->name << ' ' << it->type->getName();
if (it->column)
out << ' ' << it->column->dumpStructure();
else
out << " nullptr";
it->dumpStructure(out);
}
return out.str();
}
@ -379,22 +376,58 @@ Names Block::getNames() const
}
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
template <typename ReturnType>
static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, const std::string & context_description)
{
size_t columns = lhs.columns();
if (rhs.columns() != columns)
return false;
auto on_error = [](const std::string & message [[maybe_unused]], int code [[maybe_unused]])
{
if constexpr (std::is_same_v<ReturnType, void>)
throw Exception(message, code);
else
return false;
};
size_t columns = rhs.columns();
if (lhs.columns() != columns)
return on_error("Block structure mismatch in " + context_description + " stream: different number of columns:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
for (size_t i = 0; i < columns; ++i)
{
const IDataType & lhs_type = *lhs.safeGetByPosition(i).type;
const IDataType & rhs_type = *rhs.safeGetByPosition(i).type;
const auto & expected = rhs.getByPosition(i);
const auto & actual = lhs.getByPosition(i);
if (!lhs_type.equals(rhs_type))
return false;
if (actual.name != expected.name)
return on_error("Block structure mismatch in " + context_description + " stream: different names of columns:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
if (!actual.type->equals(*expected.type))
return on_error("Block structure mismatch in " + context_description + " stream: different types:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
if (actual.column->getName() != expected.column->getName())
return on_error("Block structure mismatch in " + context_description + " stream: different columns:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
if (actual.column->isColumnConst() && expected.column->isColumnConst()
&& static_cast<const ColumnConst &>(*actual.column).getField() != static_cast<const ColumnConst &>(*expected.column).getField())
return on_error("Block structure mismatch in " + context_description + " stream: different values of constants",
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
}
return true;
return ReturnType(true);
}
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
return checkBlockStructure<bool>(lhs, rhs, {});
}
void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description)
{
checkBlockStructure<void>(lhs, rhs, context_description);
}
@ -453,12 +486,12 @@ void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out
for (auto it = left_columns.rbegin(); it != left_columns.rend(); ++it)
{
lhs_diff_writer << it->prettyPrint();
lhs_diff_writer << it->dumpStructure();
lhs_diff_writer << ", position: " << lhs.getPositionByName(it->name) << '\n';
}
for (auto it = right_columns.rbegin(); it != right_columns.rend(); ++it)
{
rhs_diff_writer << it->prettyPrint();
rhs_diff_writer << it->dumpStructure();
rhs_diff_writer << ", position: " << rhs.getPositionByName(it->name) << '\n';
}
}

View File

@ -137,10 +137,13 @@ using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;
/// Compare column types for blocks. The order of the columns matters. Names do not matter.
/// Compare number of columns, data types, column types, column names, and values of constant columns.
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);
/// Calculate difference in structure of blocks and write description into output strings.
/// Throw exception when blocks are different.
void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description);
/// Calculate difference in structure of blocks and write description into output strings. NOTE It doesn't compare values of constant columns.
void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out_lhs_diff, std::string & out_rhs_diff);

View File

@ -1,6 +1,7 @@
#include <Core/ColumnsWithTypeAndName.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
namespace DB
@ -19,7 +20,7 @@ ColumnWithTypeAndName ColumnWithTypeAndName::cloneEmpty() const
}
bool ColumnWithTypeAndName::operator== (const ColumnWithTypeAndName & other) const
bool ColumnWithTypeAndName::operator==(const ColumnWithTypeAndName & other) const
{
return name == other.name
&& ((!type && !other.type) || (type && other.type && type->equals(*other.type)))
@ -27,20 +28,25 @@ bool ColumnWithTypeAndName::operator== (const ColumnWithTypeAndName & other) con
}
String ColumnWithTypeAndName::prettyPrint() const
void ColumnWithTypeAndName::dumpStructure(WriteBuffer & out) const
{
out << name;
if (type)
out << ' ' << type->getName();
else
out << " nullptr";
if (column)
out << ' ' << column->dumpStructure();
else
out << " nullptr";
}
String ColumnWithTypeAndName::dumpStructure() const
{
WriteBufferFromOwnString out;
writeString(name, out);
if (type)
{
writeChar(' ', out);
writeString(type->getName(), out);
}
if (column)
{
writeChar(' ', out);
writeString(column->getName(), out);
}
dumpStructure(out);
return out.str();
}

View File

@ -7,6 +7,9 @@
namespace DB
{
class WriteBuffer;
/** Column data along with its data type and name.
* Column data could be nullptr - to represent just 'header' of column.
* Name could be either name from a table or some temporary generated name during expression evaluation.
@ -28,7 +31,9 @@ struct ColumnWithTypeAndName
ColumnWithTypeAndName cloneEmpty() const;
bool operator==(const ColumnWithTypeAndName & other) const;
String prettyPrint() const;
void dumpStructure(WriteBuffer & out) const;
String dumpStructure() const;
};
}

View File

@ -10,7 +10,7 @@
namespace DB
{
void AddingDefaultBlockOutputStream::write(const DB::Block & block)
void AddingDefaultBlockOutputStream::write(const Block & block)
{
Block res = block;
@ -71,9 +71,6 @@ void AddingDefaultBlockOutputStream::write(const DB::Block & block)
}
/// Computes explicitly specified values (in column_defaults) by default.
/** @todo if somehow block does not contain values for implicitly-defaulted columns that are prerequisites
* for explicitly-defaulted ones, exception will be thrown during evaluating such columns
* (implicitly-defaulted columns are evaluated on the line after following one. */
evaluateMissingDefaults(res, required_columns, column_defaults, context);
output->write(res);

View File

@ -19,16 +19,18 @@ class AddingDefaultBlockOutputStream : public IBlockOutputStream
public:
AddingDefaultBlockOutputStream(
const BlockOutputStreamPtr & output_,
const Block & header_,
NamesAndTypesList required_columns_,
const ColumnDefaults & column_defaults_,
const Context & context_,
bool only_explicit_column_defaults_)
: output(output_), required_columns(required_columns_),
: output(output_), header(header_), required_columns(required_columns_),
column_defaults(column_defaults_), context(context_),
only_explicit_column_defaults(only_explicit_column_defaults_)
{
}
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
@ -38,6 +40,7 @@ public:
private:
BlockOutputStreamPtr output;
Block header;
NamesAndTypesList required_columns;
const ColumnDefaults column_defaults;
const Context & context;

View File

@ -76,7 +76,7 @@ Block AggregatingBlockInputStream::readImpl()
AggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
: file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
}

View File

@ -21,8 +21,6 @@ struct BlockIO
BlockInputStreamPtr in;
BlockOutputStreamPtr out;
Block out_sample; /// Example of a block to be written to `out`.
/// Callbacks for query logging could be set here.
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
std::function<void()> exception_callback;
@ -50,7 +48,6 @@ struct BlockIO
process_list_entry = rhs.process_list_entry;
in = rhs.in;
out = rhs.out;
out_sample = rhs.out_sample;
finish_callback = rhs.finish_callback;
exception_callback = rhs.exception_callback;

View File

@ -5,8 +5,8 @@
namespace DB
{
BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_)
: row_output(row_output_), first_row(true) {}
BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_)
: row_output(row_output_), header(header_) {}
void BlockOutputStreamFromRowOutputStream::write(const Block & block)

View File

@ -13,7 +13,9 @@ namespace DB
class BlockOutputStreamFromRowOutputStream : public IBlockOutputStream
{
public:
BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_);
BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writePrefix() override { row_output->writePrefix(); }
void writeSuffix() override { row_output->writeSuffix(); }
@ -29,7 +31,8 @@ public:
private:
RowOutputStreamPtr row_output;
bool first_row;
Block header;
bool first_row = true;
};
}

View File

@ -8,11 +8,37 @@ namespace DB
CastTypeBlockInputStream::CastTypeBlockInputStream(
const Context & context_,
const BlockInputStreamPtr & input_,
const Block & reference_definition_)
: context(context_), ref_definition(reference_definition_)
const BlockInputStreamPtr & input,
const Block & reference_definition)
: context(context_)
{
children.emplace_back(input_);
children.emplace_back(input);
Block input_header = input->getHeader();
for (size_t col_num = 0, num_columns = input_header.columns(); col_num < num_columns; ++col_num)
{
const auto & elem = input_header.getByPosition(col_num);
if (!reference_definition.has(elem.name))
{
header.insert(elem);
continue;
}
const auto & ref_column = reference_definition.getByName(elem.name);
/// Force conversion if source and destination types is different.
if (ref_column.type->equals(*elem.type))
{
header.insert(elem);
}
else
{
header.insert({ castColumn(elem, ref_column.type, context), ref_column.type, elem.name });
cast_description.emplace(col_num, ref_column.type);
}
}
}
String CastTypeBlockInputStream::getName() const
@ -27,12 +53,6 @@ Block CastTypeBlockInputStream::readImpl()
if (!block)
return block;
if (!initialized)
{
initialized = true;
initialize(block);
}
if (cast_description.empty())
return block;
@ -53,23 +73,4 @@ Block CastTypeBlockInputStream::readImpl()
return res;
}
void CastTypeBlockInputStream::initialize(const Block & src_block)
{
for (size_t src_col = 0, num_columns = src_block.columns(); src_col < num_columns; ++src_col)
{
const auto & src_column = src_block.getByPosition(src_col);
/// Skip, if it is a problem, it will be detected on the next pipeline stage
if (!ref_definition.has(src_column.name))
continue;
const auto & ref_column = ref_definition.getByName(src_column.name);
/// Force conversion if source and destination types is different.
if (!ref_column.type->equals(*src_column.type))
cast_description.emplace(src_col, ref_column.type);
}
}
}

View File

@ -7,7 +7,7 @@
namespace DB
{
/// Implicitly converts string and numeric values to Enum, numeric types to other numeric types.
/// Implicitly converts types.
class CastTypeBlockInputStream : public IProfilingBlockInputStream
{
public:
@ -17,18 +17,13 @@ public:
String getName() const override;
Block getHeader() const override { return ref_definition; }
protected:
Block readImpl() override;
Block getHeader() const override { return header; }
private:
const Context & context;
Block ref_definition;
Block readImpl() override;
/// Initializes cast_description and prepares tmp_conversion_block
void initialize(const Block & src_block);
bool initialized = false;
const Context & context;
Block header;
/// Describes required conversions on source block
/// Contains column numbers in source block that should be converted

View File

@ -12,7 +12,6 @@ namespace DB
class CountingBlockOutputStream : public IBlockOutputStream
{
public:
CountingBlockOutputStream(const BlockOutputStreamPtr & stream_)
: stream(stream_) {}
@ -31,6 +30,7 @@ public:
return progress;
}
Block getHeader() const override { return stream->getHeader(); }
void write(const Block & block) override;
void writePrefix() override { stream->writePrefix(); }
@ -40,7 +40,6 @@ public:
String getContentType() const override { return stream->getContentType(); }
protected:
BlockOutputStreamPtr stream;
Progress progress;
ProgressCallback progress_callback;

View File

@ -1,5 +1,6 @@
#include <Interpreters/Set.h>
#include <Interpreters/Join.h>
#include <DataStreams/materializeBlock.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <Storages/IStorage.h>
@ -108,6 +109,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (!done_with_table)
{
block = materializeBlock(block);
table_out->write(block);
rows_to_transfer += block.rows();

View File

@ -21,7 +21,7 @@ public:
String getName() const override
{
return "FilterColumnsBlockInputStream";
return "FilterColumns";
}
Block getHeader() const override;

View File

@ -141,66 +141,66 @@ static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf
FormatSettingsJSON json_settings(settings.output_format_json_quote_64bit_integers, settings.output_format_json_quote_denormals);
if (name == "Native")
return std::make_shared<NativeBlockOutputStream>(buf);
return std::make_shared<NativeBlockOutputStream>(buf, 0, sample);
else if (name == "RowBinary")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<BinaryRowOutputStream>(buf));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<BinaryRowOutputStream>(buf), sample);
else if (name == "TabSeparated" || name == "TSV")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample), sample);
else if (name == "TabSeparatedWithNames" || name == "TSVWithNames")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true), sample);
else if (name == "TabSeparatedWithNamesAndTypes" || name == "TSVWithNamesAndTypes")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true, true));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true, true), sample);
else if (name == "TabSeparatedRaw" || name == "TSVRaw")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRawRowOutputStream>(buf, sample));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRawRowOutputStream>(buf, sample), sample);
else if (name == "CSV")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<CSVRowOutputStream>(buf, sample));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<CSVRowOutputStream>(buf, sample), sample);
else if (name == "CSVWithNames")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<CSVRowOutputStream>(buf, sample, true));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<CSVRowOutputStream>(buf, sample, true), sample);
else if (name == "Pretty")
return std::make_shared<PrettyBlockOutputStream>(buf, false, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettyBlockOutputStream>(buf, sample, false, settings.output_format_pretty_max_rows, context);
else if (name == "PrettyCompact")
return std::make_shared<PrettyCompactBlockOutputStream>(buf, false, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, false, settings.output_format_pretty_max_rows, context);
else if (name == "PrettyCompactMonoBlock")
{
BlockOutputStreamPtr dst = std::make_shared<PrettyCompactBlockOutputStream>(buf, false, settings.output_format_pretty_max_rows, context);
BlockOutputStreamPtr dst = std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, false, settings.output_format_pretty_max_rows, context);
auto res = std::make_shared<SquashingBlockOutputStream>(dst, settings.output_format_pretty_max_rows, 0);
res->disableFlush();
return res;
}
else if (name == "PrettySpace")
return std::make_shared<PrettySpaceBlockOutputStream>(buf, false, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettySpaceBlockOutputStream>(buf, sample, false, settings.output_format_pretty_max_rows, context);
else if (name == "PrettyNoEscapes")
return std::make_shared<PrettyBlockOutputStream>(buf, true, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettyBlockOutputStream>(buf, sample, true, settings.output_format_pretty_max_rows, context);
else if (name == "PrettyCompactNoEscapes")
return std::make_shared<PrettyCompactBlockOutputStream>(buf, true, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, true, settings.output_format_pretty_max_rows, context);
else if (name == "PrettySpaceNoEscapes")
return std::make_shared<PrettySpaceBlockOutputStream>(buf, true, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettySpaceBlockOutputStream>(buf, sample, true, settings.output_format_pretty_max_rows, context);
else if (name == "Vertical")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<VerticalRowOutputStream>(
buf, sample, settings.output_format_pretty_max_rows));
buf, sample, settings.output_format_pretty_max_rows), sample);
else if (name == "VerticalRaw")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<VerticalRawRowOutputStream>(
buf, sample, settings.output_format_pretty_max_rows));
buf, sample, settings.output_format_pretty_max_rows), sample);
else if (name == "Values")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<ValuesRowOutputStream>(buf));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<ValuesRowOutputStream>(buf), sample);
else if (name == "JSON")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONRowOutputStream>(
buf, sample, settings.output_format_write_statistics, json_settings));
buf, sample, settings.output_format_write_statistics, json_settings), sample);
else if (name == "JSONCompact")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONCompactRowOutputStream>(
buf, sample, settings.output_format_write_statistics, json_settings));
buf, sample, settings.output_format_write_statistics, json_settings), sample);
else if (name == "JSONEachRow")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONEachRowRowOutputStream>(
buf, sample, json_settings));
buf, sample, json_settings), sample);
else if (name == "XML")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<XMLRowOutputStream>(buf, sample,
settings.output_format_write_statistics));
settings.output_format_write_statistics), sample);
else if (name == "TSKV")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TSKVRowOutputStream>(buf, sample));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TSKVRowOutputStream>(buf, sample), sample);
else if (name == "ODBCDriver")
return std::make_shared<ODBCDriverBlockOutputStream>(buf, sample);
else if (name == "Null")
return std::make_shared<NullBlockOutputStream>();
return std::make_shared<NullBlockOutputStream>(sample);
else
throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
}
@ -211,7 +211,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer &
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(getOutputImpl(name, buf, sample, context));
return std::make_shared<MaterializingBlockOutputStream>(getOutputImpl(name, buf, materializeBlock(sample), context), sample);
}
}

View File

@ -118,9 +118,7 @@ private:
size_t checkDepthImpl(size_t max_depth, size_t level) const;
/** Get text that identifies this source and the entire subtree.
* Unlike getID - without taking into account the parameters.
*/
/// Get text with names of this source and the entire subtree.
String getTreeID() const;
};

View File

@ -4,12 +4,12 @@
#include <vector>
#include <memory>
#include <boost/noncopyable.hpp>
#include <Core/Block.h>
namespace DB
{
class Block;
struct Progress;
class TableStructureReadLock;
@ -26,6 +26,12 @@ class IBlockOutputStream : private boost::noncopyable
public:
IBlockOutputStream() {}
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* You must pass blocks of exactly this structure to the 'write' method.
*/
virtual Block getHeader() const = 0;
/** Write block.
*/
virtual void write(const Block & block) = 0;

View File

@ -5,6 +5,7 @@
#include <Interpreters/ProcessList.h>
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
@ -15,6 +16,7 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_SLOW;
extern const int LOGICAL_ERROR;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
@ -70,6 +72,15 @@ Block IProfilingBlockInputStream::read()
progress(Progress(res.rows(), res.bytes()));
#ifndef NDEBUG
if (res)
{
Block header = getHeader();
if (header)
assertBlocksHaveEqualStructure(res, header, getName());
}
#endif
return res;
}

View File

@ -43,7 +43,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out_sample, context.getSettings().max_insert_block_size);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size);
}
}

View File

@ -12,9 +12,10 @@ namespace DB
class MaterializingBlockOutputStream : public IBlockOutputStream
{
public:
MaterializingBlockOutputStream(const BlockOutputStreamPtr & output)
: output{output} {}
MaterializingBlockOutputStream(const BlockOutputStreamPtr & output, const Block & header)
: output{output}, header(header) {}
Block getHeader() const override { return header; }
void write(const Block & block) override { output->write(materializeBlock(block)); }
void flush() override { output->flush(); }
void writePrefix() override { output->writePrefix(); }
@ -27,6 +28,7 @@ public:
private:
BlockOutputStreamPtr output;
Block header;
};
}

View File

@ -34,29 +34,29 @@ static void removeConstantsFromBlock(Block & block)
}
}
static void removeConstantsFromSortDescription(const Block & sample_block, SortDescription & description)
static void removeConstantsFromSortDescription(const Block & header, SortDescription & description)
{
description.erase(std::remove_if(description.begin(), description.end(),
[&](const SortColumnDescription & elem)
{
if (!elem.column_name.empty())
return sample_block.getByName(elem.column_name).column->isColumnConst();
return header.getByName(elem.column_name).column->isColumnConst();
else
return sample_block.safeGetByPosition(elem.column_number).column->isColumnConst();
return header.safeGetByPosition(elem.column_number).column->isColumnConst();
}), description.end());
}
/** Add into block, whose constant columns was removed by previous function,
* constant columns from sample_block (which must have structure as before removal of constants from block).
* constant columns from header (which must have structure as before removal of constants from block).
*/
static void enrichBlockWithConstants(Block & block, const Block & sample_block)
static void enrichBlockWithConstants(Block & block, const Block & header)
{
size_t rows = block.rows();
size_t columns = sample_block.columns();
size_t columns = header.columns();
for (size_t i = 0; i < columns; ++i)
{
const auto & col_type_name = sample_block.getByPosition(i);
const auto & col_type_name = header.getByPosition(i);
if (col_type_name.column->isColumnConst())
block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name});
}
@ -65,6 +65,12 @@ static void enrichBlockWithConstants(Block & block, const Block & sample_block)
Block MergeSortingBlockInputStream::readImpl()
{
if (!header)
{
header = getHeader();
removeConstantsFromSortDescription(header, description);
}
/** Algorithm:
* - read to memory blocks from source stream;
* - if too much of them and if external sorting is enabled,
@ -77,12 +83,6 @@ Block MergeSortingBlockInputStream::readImpl()
{
while (Block block = children.back()->read())
{
if (!sample_block)
{
sample_block = block.cloneEmpty();
removeConstantsFromSortDescription(sample_block, description);
}
/// If there were only const columns in sort description, then there is no need to sort.
/// Return the blocks as is.
if (description.empty())
@ -103,7 +103,7 @@ Block MergeSortingBlockInputStream::readImpl()
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf);
NativeBlockOutputStream block_out(compressed_buf, 0, block.cloneEmpty());
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
@ -148,7 +148,7 @@ Block MergeSortingBlockInputStream::readImpl()
Block res = impl->read();
if (res)
enrichBlockWithConstants(res, sample_block);
enrichBlockWithConstants(res, header);
return res;
}

View File

@ -108,7 +108,7 @@ private:
/// Before operation, will remove constant columns from blocks. And after, place constant columns back.
/// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files)
/// Save original block structure here.
Block sample_block;
Block header;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;

View File

@ -219,10 +219,10 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
parallel_merge_data->merged_blocks_changed.wait(lock, [this]
{
return parallel_merge_data->finish /// Requested to finish early.
|| parallel_merge_data->exception /// An error in merging thread.
|| parallel_merge_data->exhausted /// No more data in sources.
|| !parallel_merge_data->merged_blocks.empty(); /// Have another merged block.
return parallel_merge_data->finish /// Requested to finish early.
|| parallel_merge_data->exception /// An error in merging thread.
|| parallel_merge_data->exhausted /// No more data in sources.
|| !parallel_merge_data->merged_blocks.empty(); /// Have another merged block.
});
if (parallel_merge_data->exception)
@ -493,7 +493,7 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
while (true)
{
if (current_bucket_num == NUM_BUCKETS)
if (current_bucket_num >= NUM_BUCKETS)
{
/// All ordinary data was processed. Maybe, there are also 'overflows'-blocks.
// std::cerr << "at end\n";

View File

@ -84,7 +84,7 @@ protected:
Block readImpl() override;
private:
static constexpr size_t NUM_BUCKETS = 256;
static constexpr int NUM_BUCKETS = 256;
Aggregator aggregator;
bool final;

View File

@ -11,7 +11,6 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
@ -92,19 +91,7 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
if (!*shared_block_ptr)
continue;
size_t src_columns = shared_block_ptr->columns();
size_t dst_columns = header.columns();
if (src_columns != dst_columns)
throw Exception("Merging blocks have different number of columns ("
+ toString(src_columns) + " and " + toString(dst_columns) + ")",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
for (size_t i = 0; i < src_columns; ++i)
if (!blocksHaveEqualStructure(*shared_block_ptr, header))
throw Exception("Merging blocks have different names or types of columns:\n"
+ shared_block_ptr->dumpStructure() + "\nand\n" + header.dumpStructure(),
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
assertBlocksHaveEqualStructure(*shared_block_ptr, header, getName());
}
merged_columns.resize(num_columns);

View File

@ -52,7 +52,7 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
for (const auto & column : index_block_it->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
header.insert({ type->createColumn(), type, column.name });
header.insert(ColumnWithTypeAndName{ type, column.name });
}
}

View File

@ -20,9 +20,9 @@ namespace ErrorCodes
NativeBlockOutputStream::NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_,
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_,
WriteBuffer * index_ostr_, size_t initial_size_of_file_)
: ostr(ostr_), client_revision(client_revision_),
: ostr(ostr_), client_revision(client_revision_), header(header_),
index_ostr(index_ostr_), initial_size_of_file(initial_size_of_file_)
{
if (index_ostr)

View File

@ -23,9 +23,10 @@ public:
/** If non-zero client_revision is specified, additional block information can be written.
*/
NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_ = 0,
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_,
WriteBuffer * index_ostr_ = nullptr, size_t initial_size_of_file_ = 0);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
@ -36,7 +37,7 @@ public:
private:
WriteBuffer & ostr;
UInt64 client_revision;
Block header;
WriteBuffer * index_ostr;
size_t initial_size_of_file; /// The initial size of the data file, if `append` done. Used for the index.
/// If you need to write index, then `ostr` must be a CompressedWriteBuffer.

View File

@ -11,7 +11,12 @@ namespace DB
class NullBlockOutputStream : public IBlockOutputStream
{
public:
NullBlockOutputStream(const Block & header) : header(header) {}
Block getHeader() const override { return header; }
void write(const Block &) override {}
private:
Block header;
};
}

View File

@ -14,32 +14,19 @@ namespace ErrorCodes
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
NullableAdapterBlockInputStream::NullableAdapterBlockInputStream(
const BlockInputStreamPtr & input,
const Block & in_sample_, const Block & out_sample_)
: header(out_sample_)
{
buildActions(in_sample_, out_sample_);
children.push_back(input);
}
Block NullableAdapterBlockInputStream::readImpl()
static Block transform(const Block & block, const NullableAdapterBlockInputStream::Actions & actions, const std::vector<std::optional<String>> & rename)
{
Block block = children.back()->read();
if (!block && !must_transform)
return block;
size_t num_columns = block.columns();
Block res;
size_t s = block.columns();
for (size_t i = 0; i < s; ++i)
for (size_t i = 0; i < num_columns; ++i)
{
const auto & elem = block.getByPosition(i);
switch (actions[i])
{
case TO_ORDINARY:
case NullableAdapterBlockInputStream::TO_ORDINARY:
{
const auto & nullable_col = static_cast<const ColumnNullable &>(*elem.column);
const auto & nullable_type = static_cast<const DataTypeNullable &>(*elem.type);
@ -54,11 +41,10 @@ Block NullableAdapterBlockInputStream::readImpl()
res.insert({
nullable_col.getNestedColumnPtr(),
nullable_type.getNestedType(),
rename[i].value_or(elem.name)
});
rename[i].value_or(elem.name)});
break;
}
case TO_NULLABLE:
case NullableAdapterBlockInputStream::TO_NULLABLE:
{
ColumnPtr null_map = ColumnUInt8::create(elem.column->size(), 0);
@ -68,12 +54,9 @@ Block NullableAdapterBlockInputStream::readImpl()
rename[i].value_or(elem.name)});
break;
}
case NONE:
case NullableAdapterBlockInputStream::NONE:
{
if (rename[i])
res.insert({elem.column, elem.type, *rename[i]});
else
res.insert(elem);
res.insert({elem.column, elem.type, rename[i].value_or(elem.name)});
break;
}
}
@ -82,13 +65,34 @@ Block NullableAdapterBlockInputStream::readImpl()
return res;
}
void NullableAdapterBlockInputStream::buildActions(
const Block & in_sample,
const Block & out_sample)
{
size_t in_size = in_sample.columns();
if (out_sample.columns() != in_size)
NullableAdapterBlockInputStream::NullableAdapterBlockInputStream(
const BlockInputStreamPtr & input,
const Block & src_header, const Block & res_header)
{
buildActions(src_header, res_header);
children.push_back(input);
header = transform(src_header, actions, rename);
}
Block NullableAdapterBlockInputStream::readImpl()
{
Block block = children.back()->read();
if (!block)
return block;
return transform(block, actions, rename);
}
void NullableAdapterBlockInputStream::buildActions(
const Block & src_header,
const Block & res_header)
{
size_t in_size = src_header.columns();
if (res_header.columns() != in_size)
throw Exception("Number of columns in INSERT SELECT doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
actions.reserve(in_size);
@ -96,8 +100,8 @@ void NullableAdapterBlockInputStream::buildActions(
for (size_t i = 0; i < in_size; ++i)
{
const auto & in_elem = in_sample.getByPosition(i);
const auto & out_elem = out_sample.getByPosition(i);
const auto & in_elem = src_header.getByPosition(i);
const auto & out_elem = res_header.getByPosition(i);
bool is_in_nullable = in_elem.type->isNullable();
bool is_out_nullable = out_elem.type->isNullable();
@ -113,9 +117,6 @@ void NullableAdapterBlockInputStream::buildActions(
rename.emplace_back(std::make_optional(out_elem.name));
else
rename.emplace_back();
if (actions.back() != NONE || rename.back())
must_transform = true;
}
}

View File

@ -18,16 +18,13 @@ namespace DB
class NullableAdapterBlockInputStream : public IProfilingBlockInputStream
{
public:
NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & in_sample_, const Block & out_sample_);
NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & src_header, const Block & res_header);
String getName() const override { return "NullableAdapterBlockInputStream"; }
String getName() const override { return "NullableAdapter"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
/// Given a column of a block we have just read,
/// how must we process it?
enum Action
@ -44,17 +41,17 @@ private:
using Actions = std::vector<Action>;
private:
Block readImpl() override;
/// Determine the actions to be taken using the source sample block,
/// which describes the columns from which we fetch data inside an INSERT
/// query, and the target sample block which contains the columns
/// we insert data into.
void buildActions(const Block & in_sample, const Block & out_sample);
void buildActions(const Block & src_header, const Block & res_header);
private:
Block header;
Actions actions;
std::vector<std::optional<String>> rename;
bool must_transform = false;
};
}

View File

@ -7,9 +7,8 @@
namespace DB
{
ODBCDriverBlockOutputStream::ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & sample_)
: out(out_)
, sample(sample_)
ODBCDriverBlockOutputStream::ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & header_)
: out(out_), header(header_)
{
}
@ -43,7 +42,7 @@ void ODBCDriverBlockOutputStream::write(const Block & block)
void ODBCDriverBlockOutputStream::writePrefix()
{
const size_t columns = sample.columns();
const size_t columns = header.columns();
/// Number of columns.
writeVarUInt(columns, out);
@ -51,7 +50,7 @@ void ODBCDriverBlockOutputStream::writePrefix()
/// Names and types of columns.
for (size_t i = 0; i < columns; ++i)
{
const ColumnWithTypeAndName & col = sample.getByPosition(i);
const ColumnWithTypeAndName & col = header.getByPosition(i);
writeStringBinary(col.name, out);
writeStringBinary(col.type->getName(), out);

View File

@ -19,8 +19,9 @@ class WriteBuffer;
class ODBCDriverBlockOutputStream : public IBlockOutputStream
{
public:
ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & sample_);
ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & header_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writePrefix() override;
@ -29,7 +30,7 @@ public:
private:
WriteBuffer & out;
const Block sample;
const Block header;
};
}

View File

@ -100,7 +100,8 @@ Block ParallelAggregatingBlockInputStream::readImpl()
ParallelAggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
: file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}

View File

@ -17,8 +17,9 @@ namespace ErrorCodes
}
PrettyBlockOutputStream::PrettyBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_)
: ostr(ostr_), max_rows(max_rows_), no_escapes(no_escapes_), context(context_)
PrettyBlockOutputStream::PrettyBlockOutputStream(
WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_)
: ostr(ostr_), header(header_), max_rows(max_rows_), no_escapes(no_escapes_), context(context_)
{
struct winsize w;
if (0 == ioctl(STDOUT_FILENO, TIOCGWINSZ, &w))

View File

@ -17,8 +17,9 @@ class PrettyBlockOutputStream : public IBlockOutputStream
{
public:
/// no_escapes - do not use ANSI escape sequences - to display in the browser, not in the console.
PrettyBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_);
PrettyBlockOutputStream(WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writeSuffix() override;
@ -32,6 +33,7 @@ protected:
void writeExtremes();
WriteBuffer & ostr;
const Block header;
size_t max_rows;
size_t total_rows = 0;
size_t terminal_width = 0;

View File

@ -11,8 +11,8 @@ namespace DB
class PrettyCompactBlockOutputStream : public PrettyBlockOutputStream
{
public:
PrettyCompactBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_)
: PrettyBlockOutputStream(ostr_, no_escapes_, max_rows_, context_) {}
PrettyCompactBlockOutputStream(WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_)
: PrettyBlockOutputStream(ostr_, header_, no_escapes_, max_rows_, context_) {}
void write(const Block & block) override;

View File

@ -11,8 +11,8 @@ namespace DB
class PrettySpaceBlockOutputStream : public PrettyBlockOutputStream
{
public:
PrettySpaceBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_)
: PrettyBlockOutputStream(ostr_, no_escapes_, max_rows_, context_) {}
PrettySpaceBlockOutputStream(WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_)
: PrettyBlockOutputStream(ostr_, header_, no_escapes_, max_rows_, context_) {}
void write(const Block & block) override;
void writeSuffix() override;

View File

@ -1,23 +0,0 @@
#include <Core/Block.h>
#include <DataStreams/ProhibitColumnsBlockOutputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
void ProhibitColumnsBlockOutputStream::write(const Block & block)
{
for (const auto & column : columns)
if (block.has(column.name))
throw Exception{"Cannot insert column " + column.name, ErrorCodes::ILLEGAL_COLUMN};
output->write(block);
}
}

View File

@ -1,31 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Core/NamesAndTypes.h>
namespace DB
{
/// Throws exception on encountering prohibited column in block
class ProhibitColumnsBlockOutputStream : public IBlockOutputStream
{
public:
ProhibitColumnsBlockOutputStream(const BlockOutputStreamPtr & output, const NamesAndTypesList & columns)
: output{output}, columns{columns}
{
}
private:
void write(const Block & block) override;
void flush() override { output->flush(); }
void writePrefix() override { output->writePrefix(); }
void writeSuffix() override { output->writeSuffix(); }
BlockOutputStreamPtr output;
NamesAndTypesList columns;
};
}

View File

@ -1,4 +1,5 @@
#include "PushingToViewsBlockOutputStream.h"
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
@ -6,8 +7,8 @@ namespace DB
{
PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
String database, String table, StoragePtr storage,
const Context & context_, const ASTPtr & query_ptr_, bool no_destination)
const String & database, const String & table, const StoragePtr & storage,
const Context & context_, const ASTPtr & query_ptr_, bool no_destination)
: context(context_), query_ptr(query_ptr_)
{
/** TODO This is a very important line. At any insertion into the table one of streams should own lock.

View File

@ -4,7 +4,6 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/StorageMaterializedView.h>
@ -19,9 +18,11 @@ class ReplicatedMergeTreeBlockOutputStream;
class PushingToViewsBlockOutputStream : public IBlockOutputStream
{
public:
PushingToViewsBlockOutputStream(String database, String table, StoragePtr storage,
PushingToViewsBlockOutputStream(
const String & database, const String & table, const StoragePtr & storage,
const Context & context_, const ASTPtr & query_ptr_, bool no_destination = false);
Block getHeader() const override { return storage->getSampleBlock(); }
void write(const Block & block) override;
void flush() override

View File

@ -19,25 +19,18 @@ namespace ErrorCodes
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_)
: connection(connection_), query(query_), settings(settings_)
{
}
void RemoteBlockOutputStream::writePrefix()
{
/** Send query and receive "sample block", that describe table structure.
* Sample block is needed to know, what structure is required for blocks to be passed to 'write' method.
/** Send query and receive "header", that describe table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
query_sent = true;
connection.sendQuery(query, "", QueryProcessingStage::Complete, settings, nullptr);
Connection::Packet packet = connection.receivePacket();
if (Protocol::Server::Data == packet.type)
{
sample_block = packet.block;
header = packet.block;
if (!sample_block)
if (!header)
throw Exception("Logical error: empty block received as table structure", ErrorCodes::LOGICAL_ERROR);
}
else if (Protocol::Server::Exception == packet.type)
@ -47,32 +40,20 @@ void RemoteBlockOutputStream::writePrefix()
}
else
throw NetException("Unexpected packet from server (expected Data or Exception, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
void RemoteBlockOutputStream::write(const Block & block)
{
if (!sample_block)
throw Exception("You must call IBlockOutputStream::writePrefix before IBlockOutputStream::write", ErrorCodes::LOGICAL_ERROR);
if (!blocksHaveEqualStructure(block, sample_block))
{
std::stringstream message;
message << "Block structure is different from table structure.\n"
<< "\nTable structure:\n(" << sample_block.dumpStructure() << ")\nBlock structure:\n(" << block.dumpStructure() << ")\n";
LOG_ERROR(&Logger::get("RemoteBlockOutputStream"), message.str());
throw DB::Exception(message.str());
}
assertBlocksHaveEqualStructure(block, header, "RemoteBlockOutputStream");
connection.sendData(block);
}
void RemoteBlockOutputStream::writePrepared(ReadBuffer & input, size_t size)
{
/// We cannot use 'sample_block'. Input must contain block with proper structure.
/// We cannot use 'header'. Input must contain block with proper structure.
connection.sendPreparedData(input, size);
}
@ -100,9 +81,19 @@ void RemoteBlockOutputStream::writeSuffix()
RemoteBlockOutputStream::~RemoteBlockOutputStream()
{
/// If interrupted in the middle of the loop of communication with the server, then interrupt the connection
if (query_sent && !finished)
connection.disconnect();
/// If interrupted in the middle of the loop of communication with the server, then interrupt the connection,
/// to not leave the connection in unsynchronized state.
if (!finished)
{
try
{
connection.disconnect();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}

View File

@ -20,14 +20,8 @@ class RemoteBlockOutputStream : public IBlockOutputStream
public:
RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_ = nullptr);
Block getHeader() const override { return header; }
/// You can call this method after 'writePrefix', to get table required structure. (You must send data with that structure).
Block getSampleBlock() const
{
return sample_block;
}
void writePrefix() override;
void write(const Block & block) override;
void writeSuffix() override;
@ -40,8 +34,7 @@ private:
Connection & connection;
String query;
const Settings * settings;
Block sample_block;
bool query_sent = false;
Block header;
bool finished = false;
};

View File

@ -17,7 +17,7 @@ void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns,
if (out_row_sources_buf)
{
/// true flag value means "skip row"
current_row_sources.back().setSkipFlag(false);
current_row_sources[max_pos].setSkipFlag(false);
out_row_sources_buf->write(reinterpret_cast<const char *>(current_row_sources.data()),
current_row_sources.size() * sizeof(RowSourcePart));
@ -96,6 +96,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
}
/// Initially, skip all rows. Unskip last on insert.
size_t current_pos = current_row_sources.size();
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);
@ -103,6 +104,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
if (version >= max_version)
{
max_version = version;
max_pos = current_pos;
setRowRef(selected_row, current);
}

View File

@ -43,6 +43,7 @@ private:
RowRef selected_row; /// Last row with maximum version for current primary key.
UInt64 max_version = 0; /// Max version for current primary key.
size_t max_pos = 0;
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key

View File

@ -14,6 +14,7 @@ class SquashingBlockOutputStream : public IBlockOutputStream
public:
SquashingBlockOutputStream(BlockOutputStreamPtr & dst, size_t min_block_size_rows, size_t min_block_size_bytes);
Block getHeader() const override { return output->getHeader(); }
void write(const Block & block) override;
void flush() override;

View File

@ -82,6 +82,14 @@ public:
children = inputs;
if (additional_input_at_end)
children.push_back(additional_input_at_end);
size_t num_children = children.size();
if (num_children > 1)
{
Block header = children.at(0)->getHeader();
for (size_t i = 1; i < num_children; ++i)
assertBlocksHaveEqualStructure(children[i]->getHeader(), header, "UNION");
}
}
String getName() const override { return "Union"; }

View File

@ -44,7 +44,7 @@ try
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, 0);
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample);
BlockOutputStreamFromRowOutputStream block_output(row_output);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample);
copyData(block_input, block_output);
}

View File

@ -56,7 +56,7 @@ try
WriteBufferFromOStream out1(std::cout);
RowOutputStreamPtr out2 = std::make_shared<TabSeparatedRowOutputStream>(out1, expression->getSampleBlock());
BlockOutputStreamFromRowOutputStream out(out2);
BlockOutputStreamFromRowOutputStream out(out2, expression->getSampleBlock());
{
Stopwatch stopwatch;

View File

@ -61,7 +61,7 @@ try
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr out_ = std::make_shared<TabSeparatedRowOutputStream>(ob, expression->getSampleBlock());
BlockOutputStreamFromRowOutputStream out(out_);
BlockOutputStreamFromRowOutputStream out(out_, expression->getSampleBlock());
{

View File

@ -134,7 +134,7 @@ int main(int, char **)
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr out_ = std::make_shared<TabSeparatedRowOutputStream>(ob, expression->getSampleBlock());
BlockOutputStreamFromRowOutputStream out(out_);
BlockOutputStreamFromRowOutputStream out(out_, in->getHeader());
copyData(*in, out);
}

View File

@ -106,7 +106,7 @@ try
BlockInputStreamPtr in = table->read(column_names, {}, Context::createGlobal(), stage, 8192, 1)[0];
WriteBufferFromFileDescriptor out1(STDOUT_FILENO);
CompressedWriteBuffer out2(out1);
NativeBlockOutputStream out3(out2, ClickHouseRevision::get());
NativeBlockOutputStream out3(out2, ClickHouseRevision::get(), in->getHeader());
copyData(*in, out3);
}

View File

@ -152,7 +152,7 @@ try
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr out_ = std::make_shared<TabSeparatedRowOutputStream>(ob, sample);
BlockOutputStreamFromRowOutputStream out(out_);
BlockOutputStreamFromRowOutputStream out(out_, sample);
copyData(*in, out);

View File

@ -38,7 +38,7 @@ try
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, 0);
BlockOutputStreamFromRowOutputStream block_output(row_output);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample);
copyData(block_input, block_output);
return 0;

View File

@ -43,7 +43,7 @@ public:
String getName() const override
{
return "DictionaryBlockInputStream";
return "Dictionary";
}
protected:

View File

@ -30,7 +30,7 @@ public:
String getName() const override
{
return "RangeDictionaryBlockInputStream";
return "RangeDictionary";
}
protected:

View File

@ -1111,6 +1111,8 @@ public:
return std::make_shared<DataTypeUUID>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
auto col_res = ColumnVector<UInt128>::create();

View File

@ -1208,6 +1208,8 @@ public:
return std::make_shared<DataTypeDateTime>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeUInt32().createColumnConst(
@ -1235,6 +1237,8 @@ public:
return std::make_shared<DataTypeDate>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeUInt16().createColumnConst(
@ -1262,6 +1266,8 @@ public:
return std::make_shared<DataTypeDate>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeUInt16().createColumnConst(

View File

@ -218,6 +218,8 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
/// The dictionary key that defines the "point of view".
@ -312,6 +314,8 @@ public:
return std::make_shared<DataTypeUInt8>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
/// The dictionary key that defines the "point of view".
@ -446,6 +450,8 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
/// The dictionary key that defines the "point of view".
@ -720,6 +726,8 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
RegionsNames::Language language = RegionsNames::Language::RU;

View File

@ -94,6 +94,8 @@ private:
return std::make_shared<DataTypeUInt8>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -274,6 +276,8 @@ private:
return std::make_shared<DataTypeString>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -535,6 +539,8 @@ private:
return std::make_shared<DataTypeString>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -821,6 +827,8 @@ private:
return std::make_shared<DataType>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -1134,6 +1142,8 @@ private:
return std::make_shared<DataType>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -1379,6 +1389,8 @@ private:
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -1549,6 +1561,8 @@ private:
return std::make_shared<DataTypeUInt8>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());

View File

@ -23,6 +23,8 @@ public:
bool isVariadic() const override { return true; }
bool isDeterministic() override { return false; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override;

View File

@ -104,6 +104,8 @@ public:
return std::make_shared<DataTypeString>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, const size_t result) override
{
block.getByPosition(result).column = DataTypeString().createColumnConst(block.rows(), db_name);
@ -126,6 +128,8 @@ public:
return name;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -391,6 +395,8 @@ public:
return name;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -434,6 +440,8 @@ public:
return 0;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -482,6 +490,8 @@ public:
return 0;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -524,6 +534,8 @@ public:
return 0;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -889,6 +901,8 @@ public:
}
/** It could return many different values for single argument. */
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -1288,6 +1302,8 @@ public:
return std::make_shared<DataTypeUInt32>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeUInt32().createColumnConst(block.rows(), static_cast<UInt64>(uptime));
@ -1323,6 +1339,8 @@ public:
return std::make_shared<DataTypeString>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeString().createColumnConst(block.rows(), DateLUT::instance().getTimeZone());
@ -1355,6 +1373,8 @@ public:
return 1;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -1632,6 +1652,8 @@ public:
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override;
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override;
private:

View File

@ -126,6 +126,9 @@ public:
* (even for distributed query), but not deterministic it general.
* Example: now(). Another example: functions that work with periodically updated dictionaries.
*/
virtual bool isDeterministic() { return true; }
virtual bool isDeterministicInScopeOfQuery() { return true; }
/** Lets you know if the function is monotonic in a range of values.
@ -320,6 +323,8 @@ public:
bool isInjective(const Block & sample_block) override { return function->isInjective(sample_block); }
bool isDeterministic() override { return function->isDeterministic(); }
bool isDeterministicInScopeOfQuery() override { return function->isDeterministicInScopeOfQuery(); }
bool hasInformationAboutMonotonicity() const override { return function->hasInformationAboutMonotonicity(); }

View File

@ -5,8 +5,6 @@
#include <common/likely.h>
#include <double-conversion/double-conversion.h>
#include <common/iostream_debug_helpers.h>
/** Methods for reading floating point numbers from text with decimal representation.
* There are "precise", "fast" and "simple" implementations.

View File

@ -14,6 +14,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h>
@ -130,7 +131,7 @@ Block Aggregator::getHeader(bool final) const
}
}
return res;
return materializeBlock(res);
}
@ -840,7 +841,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
const std::string & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, ClickHouseRevision::get());
NativeBlockOutputStream block_out(compressed_buf, ClickHouseRevision::get(), getHeader(false));
LOG_DEBUG(log, "Writing part of aggregation data into temporary file " << path << ".");
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);

View File

@ -29,7 +29,7 @@ private:
const Block header;
QueryProcessingStage::Enum processed_stage;
QualifiedTableName main_table;
const Tables & external_tables;
Tables external_tables;
};
}

View File

@ -142,39 +142,11 @@ ExpressionAction ExpressionAction::ordinaryJoin(std::shared_ptr<const Join> join
}
ExpressionActions::Actions ExpressionAction::getPrerequisites(Block & sample_block)
{
ExpressionActions::Actions res;
if (type == APPLY_FUNCTION)
{
if (sample_block.has(result_name))
throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
ColumnsWithTypeAndName arguments(argument_names.size());
for (size_t i = 0; i < argument_names.size(); ++i)
{
if (!sample_block.has(argument_names[i]))
throw Exception("Unknown identifier: '" + argument_names[i] + "'", ErrorCodes::UNKNOWN_IDENTIFIER);
arguments[i] = sample_block.getByName(argument_names[i]);
}
function = function_builder->build(arguments);
result_type = function->getReturnType();
}
return res;
}
void ExpressionAction::prepare(Block & sample_block)
{
// std::cerr << "preparing: " << toString() << std::endl;
/** Constant expressions should be evaluated, and put the result in sample_block.
* For non-constant columns, put the nullptr as the column in sample_block.
*
* The fact that only for constant expressions column != nullptr,
* can be used later when optimizing the query.
*/
switch (type)
@ -567,40 +539,43 @@ void ExpressionActions::addInput(const NameAndTypePair & column)
void ExpressionActions::add(const ExpressionAction & action, Names & out_new_columns)
{
NameSet temp_names;
addImpl(action, temp_names, out_new_columns);
addImpl(action, out_new_columns);
}
void ExpressionActions::add(const ExpressionAction & action)
{
NameSet temp_names;
Names new_names;
addImpl(action, temp_names, new_names);
addImpl(action, new_names);
}
void ExpressionActions::addImpl(ExpressionAction action, NameSet & current_names, Names & new_names)
void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
{
if (sample_block.has(action.result_name))
return;
if (current_names.count(action.result_name))
throw Exception("Cyclic function prerequisites: " + action.result_name, ErrorCodes::LOGICAL_ERROR);
current_names.insert(action.result_name);
if (action.result_name != "")
new_names.push_back(action.result_name);
new_names.insert(new_names.end(), action.array_joined_columns.begin(), action.array_joined_columns.end());
Actions prerequisites = action.getPrerequisites(sample_block);
if (action.type == ExpressionAction::APPLY_FUNCTION)
{
if (sample_block.has(action.result_name))
throw Exception("Column '" + action.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
for (size_t i = 0; i < prerequisites.size(); ++i)
addImpl(prerequisites[i], current_names, new_names);
ColumnsWithTypeAndName arguments(action.argument_names.size());
for (size_t i = 0; i < action.argument_names.size(); ++i)
{
if (!sample_block.has(action.argument_names[i]))
throw Exception("Unknown identifier: '" + action.argument_names[i] + "'", ErrorCodes::UNKNOWN_IDENTIFIER);
arguments[i] = sample_block.getByName(action.argument_names[i]);
}
action.function = action.function_builder->build(arguments);
action.result_type = action.function->getReturnType();
}
action.prepare(sample_block);
actions.push_back(action);
current_names.erase(action.result_name);
}
void ExpressionActions::prependProjectInput()
@ -1017,19 +992,11 @@ void ExpressionActions::optimizeArrayJoin()
}
BlockInputStreamPtr ExpressionActions::createStreamWithNonJoinedDataIfFullOrRightJoin(size_t max_block_size) const
BlockInputStreamPtr ExpressionActions::createStreamWithNonJoinedDataIfFullOrRightJoin(const Block & source_header, size_t max_block_size) const
{
for (const auto & action : actions)
{
if (action.join && (action.join->getKind() == ASTTableJoin::Kind::Full || action.join->getKind() == ASTTableJoin::Kind::Right))
{
Block left_sample_block;
for (const auto & input_elem : input_columns)
left_sample_block.insert(ColumnWithTypeAndName{ input_elem.type, input_elem.name });
return action.join->createStreamWithNonJoinedRows(left_sample_block, max_block_size);
}
}
return action.join->createStreamWithNonJoinedRows(source_header, max_block_size);
return {};
}

View File

@ -99,7 +99,6 @@ public:
static ExpressionAction ordinaryJoin(std::shared_ptr<const Join> join_, const NamesAndTypesList & columns_added_by_join_);
/// Which columns necessary to perform this action.
/// If this `Action` is not already added to `ExpressionActions`, the returned list may be incomplete, because `prerequisites` are not taken into account.
Names getNeededColumns() const;
std::string toString() const;
@ -107,7 +106,6 @@ public:
private:
friend class ExpressionActions;
std::vector<ExpressionAction> getPrerequisites(Block & sample_block);
void prepare(Block & sample_block);
void execute(Block & block) const;
void executeOnTotals(Block & block) const;
@ -147,8 +145,7 @@ public:
void add(const ExpressionAction & action);
/// Adds new column names to out_new_columns
/// (formed as a result of the added action and its prerequisites).
/// Adds new column names to out_new_columns (formed as a result of the added action).
void add(const ExpressionAction & action, Names & out_new_columns);
/// Adds to the beginning the removal of all extra columns.
@ -198,7 +195,7 @@ public:
static std::string getSmallestColumn(const NamesAndTypesList & columns);
BlockInputStreamPtr createStreamWithNonJoinedDataIfFullOrRightJoin(size_t max_block_size) const;
BlockInputStreamPtr createStreamWithNonJoinedDataIfFullOrRightJoin(const Block & source_header, size_t max_block_size) const;
private:
NamesAndTypesList input_columns;
@ -208,9 +205,7 @@ private:
void checkLimits(Block & block) const;
/// Adds all `prerequisites` first, then the action itself.
/// current_names - columns whose `prerequisites` are currently being processed.
void addImpl(ExpressionAction action, NameSet & current_names, Names & new_names);
void addImpl(ExpressionAction action, Names & new_names);
/// Try to improve something without changing the lists of input and output columns.
void optimize();

View File

@ -802,84 +802,24 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name_or_t
StoragePtr external_storage = StorageMemory::create(external_table_name, columns, NamesAndTypesList{}, NamesAndTypesList{}, ColumnDefaults{});
external_storage->startup();
/** There are two ways to perform distributed GLOBAL subqueries.
*
* "push" method:
* Subquery data is sent to all remote servers, where they are then used.
* For this method, the data is sent in the form of "external tables" and will be available on each remote server by the name of the type _data1.
* Replace in the query a subquery for this name.
*
* "pull" method:
* Remote servers download the subquery data from the request initiating server.
* For this method, replace the subquery with another subquery of the form (SELECT * FROM remote ('host: port', _query_QUERY_ID, _data1))
* This subquery, in fact, says - "you need to download data from there."
*
* The "pull" method takes precedence, because in it a remote server can decide that it does not need data and does not download it in such cases.
*/
/** We replace the subquery with the name of the temporary table.
* It is in this form, the request will go to the remote server.
* This temporary table will go to the remote server, and on its side,
* instead of doing a subquery, you just need to read it.
*/
if (settings.global_subqueries_method == GlobalSubqueriesMethod::PUSH)
auto database_and_table_name = std::make_shared<ASTIdentifier>(StringRange(), external_table_name, ASTIdentifier::Table);
if (auto ast_table_expr = typeid_cast<ASTTableExpression *>(subquery_or_table_name_or_table_expression.get()))
{
/** We replace the subquery with the name of the temporary table.
* It is in this form, the request will go to the remote server.
* This temporary table will go to the remote server, and on its side,
* instead of doing a subquery, you just need to read it.
*/
ast_table_expr->subquery.reset();
ast_table_expr->database_and_table_name = database_and_table_name;
auto database_and_table_name = std::make_shared<ASTIdentifier>(StringRange(), external_table_name, ASTIdentifier::Table);
if (auto ast_table_expr = typeid_cast<ASTTableExpression *>(subquery_or_table_name_or_table_expression.get()))
{
ast_table_expr->subquery.reset();
ast_table_expr->database_and_table_name = database_and_table_name;
ast_table_expr->children.clear();
ast_table_expr->children.emplace_back(database_and_table_name);
}
else
subquery_or_table_name_or_table_expression = database_and_table_name;
}
else if (settings.global_subqueries_method == GlobalSubqueriesMethod::PULL)
{
throw Exception("Support for 'pull' method of execution of global subqueries is disabled.", ErrorCodes::SUPPORT_IS_DISABLED);
/// TODO
/* String host_port = getFQDNOrHostName() + ":" + toString(context.getTCPPort());
String database = "_query_" + context.getCurrentQueryId();
auto subquery = std::make_shared<ASTSubquery>();
subquery_or_table_name = subquery;
auto select = std::make_shared<ASTSelectQuery>();
subquery->children.push_back(select);
auto exp_list = std::make_shared<ASTExpressionList>();
select->select_expression_list = exp_list;
select->children.push_back(select->select_expression_list);
Names column_names = external_storage->getColumnNamesList();
for (const auto & name : column_names)
exp_list->children.push_back(std::make_shared<ASTIdentifier>(StringRange(), name));
auto table_func = std::make_shared<ASTFunction>();
select->table = table_func;
select->children.push_back(select->table);
table_func->name = "remote";
auto args = std::make_shared<ASTExpressionList>();
table_func->arguments = args;
table_func->children.push_back(table_func->arguments);
auto address_lit = std::make_shared<ASTLiteral>(StringRange(), host_port);
args->children.push_back(address_lit);
auto database_lit = std::make_shared<ASTLiteral>(StringRange(), database);
args->children.push_back(database_lit);
auto table_lit = std::make_shared<ASTLiteral>(StringRange(), external_table_name);
args->children.push_back(table_lit);*/
ast_table_expr->children.clear();
ast_table_expr->children.emplace_back(database_and_table_name);
}
else
throw Exception("Unknown global subqueries execution method", ErrorCodes::UNKNOWN_GLOBAL_SUBQUERIES_METHOD);
subquery_or_table_name_or_table_expression = database_and_table_name;
external_tables[external_table_name] = external_storage;
subqueries_for_sets[external_table_name].source = interpreter->execute().in;
@ -2637,25 +2577,6 @@ void ExpressionAnalyzer::appendProjectResult(ExpressionActionsChain & chain) con
}
Block ExpressionAnalyzer::getSelectSampleBlock()
{
assertSelect();
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(aggregated_columns, settings);
NamesWithAliases result_columns;
ASTs asts = select_query->select_expression_list->children;
for (size_t i = 0; i < asts.size(); ++i)
{
result_columns.emplace_back(asts[i]->getColumnName(), asts[i]->getAliasOrColumnName());
getRootActions(asts[i], true, false, temp_actions);
}
temp_actions->add(ExpressionAction::project(result_columns));
return temp_actions->getSampleBlock();
}
void ExpressionAnalyzer::getActionsBeforeAggregation(const ASTPtr & ast, ExpressionActionsPtr & actions, bool no_subqueries)
{
ASTFunction * node = typeid_cast<ASTFunction *>(ast.get());

View File

@ -135,9 +135,6 @@ public:
*/
const Tables & getExternalTables() const { return external_tables; }
/// If ast is a SELECT query, it gets the aliases and column types from the SELECT section.
Block getSelectSampleBlock();
/// Create Set-s that we can from IN section to use the index on them.
void makeSetsForIndex();

View File

@ -92,49 +92,6 @@ InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, const Co
{
}
Block InterpreterCheckQuery::getSampleBlock() const
{
Block block;
ColumnWithTypeAndName col;
col.name = "status";
col.type = std::make_shared<DataTypeUInt8>();
col.column = col.type->createColumn();
block.insert(col);
col.name = "host_name";
col.type = std::make_shared<DataTypeString>();
col.column = col.type->createColumn();
block.insert(col);
col.name = "host_address";
col.type = std::make_shared<DataTypeString>();
col.column = col.type->createColumn();
block.insert(col);
col.name = "port";
col.type = std::make_shared<DataTypeUInt16>();
col.column = col.type->createColumn();
block.insert(col);
col.name = "user";
col.type = std::make_shared<DataTypeString>();
col.column = col.type->createColumn();
block.insert(col);
col.name = "structure_class";
col.type = std::make_shared<DataTypeUInt32>();
col.column = col.type->createColumn();
block.insert(col);
col.name = "structure";
col.type = std::make_shared<DataTypeString>();
col.column = col.type->createColumn();
block.insert(col);
return block;
}
BlockIO InterpreterCheckQuery::execute()
{

View File

@ -16,9 +16,6 @@ public:
BlockIO execute() override;
private:
Block getSampleBlock() const;
private:
ASTPtr query_ptr;

View File

@ -10,17 +10,12 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DataStreams/ProhibitColumnsBlockOutputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
@ -33,6 +28,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/NestedUtils.h>
@ -43,8 +39,10 @@
#include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
namespace ErrorCodes
{
extern const int DIRECTORY_DOESNT_EXIST;
@ -474,13 +472,9 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (create.select && (create.is_view || create.is_materialized_view))
create.select->setDatabaseIfNeeded(current_database);
std::unique_ptr<InterpreterSelectQuery> interpreter_select;
Block as_select_sample;
if (create.select && (!create.attach || !create.columns))
{
interpreter_select = std::make_unique<InterpreterSelectQuery>(create.select->clone(), context);
as_select_sample = interpreter_select->getSampleBlock();
}
as_select_sample = InterpreterSelectQuery::getSampleBlock(create.select->clone(), context);
String as_database_name = create.as_database.empty() ? current_database : create.as_database;
String as_table_name = create.as_table;
@ -554,28 +548,15 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (create.select && !create.attach
&& !create.is_view && (!create.is_materialized_view || create.is_populate))
{
auto table_lock = res->lockStructure(true, __PRETTY_FUNCTION__);
auto insert = std::make_shared<ASTInsertQuery>();
/// Also see InterpreterInsertQuery.
BlockOutputStreamPtr out;
if (!create.is_temporary)
insert->database = database_name;
out = std::make_shared<PushingToViewsBlockOutputStream>(
create.database, create.table, res, create.is_temporary ? context.getSessionContext() : context, query_ptr);
insert->table = table_name;
insert->select = create.select->clone();
out = std::make_shared<MaterializingBlockOutputStream>(out);
/// @note shouldn't these two contexts be session contexts in case of temporary table?
bool strict_insert_defaults = static_cast<bool>(context.getSettingsRef().strict_insert_defaults);
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, columns.columns, columns.column_defaults, context, strict_insert_defaults);
if (!context.getSettingsRef().insert_allow_materialized_columns)
out = std::make_shared<ProhibitColumnsBlockOutputStream>(out, columns.materialized_columns);
BlockIO io;
io.in = std::make_shared<NullAndDoCopyBlockInputStream>(interpreter_select->execute().in, out);
return io;
return InterpreterInsertQuery(insert, context.getSessionContext(), context.getSettingsRef().insert_allow_materialized_columns).execute();
}
return {};

View File

@ -8,7 +8,6 @@
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DataStreams/NullableAdapterBlockInputStream.h>
#include <DataStreams/ProhibitColumnsBlockOutputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
@ -23,17 +22,20 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <Parsers/ASTFunction.h>
namespace ProfileEvents
{
extern const Event InsertQuery;
extern const Event InsertQuery;
}
namespace DB
{
namespace ErrorCodes
{
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int READONLY;
extern const int ILLEGAL_COLUMN;
}
@ -45,10 +47,8 @@ InterpreterInsertQuery::InterpreterInsertQuery(
}
StoragePtr InterpreterInsertQuery::loadTable()
StoragePtr InterpreterInsertQuery::getTable(const ASTInsertQuery & query)
{
ASTInsertQuery & query = typeid_cast<ASTInsertQuery &>(*query_ptr);
if (query.table_function)
{
auto table_function = typeid_cast<const ASTFunction *>(query.table_function.get());
@ -56,27 +56,19 @@ StoragePtr InterpreterInsertQuery::loadTable()
return factory.get(table_function->name, context)->execute(query.table_function, context);
}
/// In what table to write.
/// Into what table to write.
return context.getTable(query.database, query.table);
}
StoragePtr InterpreterInsertQuery::getTable()
Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table)
{
if (!cached_table)
cached_table = loadTable();
return cached_table;
}
Block InterpreterInsertQuery::getSampleBlock()
{
ASTInsertQuery & query = typeid_cast<ASTInsertQuery &>(*query_ptr);
Block table_sample_non_materialized = table->getSampleBlockNonMaterialized();
/// If the query does not include information about columns
if (!query.columns)
return getTable()->getSampleBlockNonMaterialized();
return table_sample_non_materialized;
Block table_sample = getTable()->getSampleBlock();
Block table_sample = table->getSampleBlock();
/// Form the block based on the column names from the query
Block res;
@ -88,13 +80,11 @@ Block InterpreterInsertQuery::getSampleBlock()
if (!table_sample.has(current_name))
throw Exception("No such column " + current_name + " in table " + query.table, ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
ColumnWithTypeAndName col;
col.name = current_name;
col.type = table_sample.getByName(current_name).type;
col.column = col.type->createColumn();
res.insert(std::move(col));
}
if (!allow_materialized && !table_sample_non_materialized.has(current_name))
throw Exception("Cannot insert column " + current_name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
res.insert(ColumnWithTypeAndName(table_sample.getByName(current_name).type, current_name));
}
return res;
}
@ -103,7 +93,7 @@ BlockIO InterpreterInsertQuery::execute()
{
ASTInsertQuery & query = typeid_cast<ASTInsertQuery &>(*query_ptr);
checkAccess(query);
StoragePtr table = getTable();
StoragePtr table = getTable(query);
auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__);
@ -114,13 +104,11 @@ BlockIO InterpreterInsertQuery::execute()
out = std::make_shared<PushingToViewsBlockOutputStream>(query.database, query.table, table, context, query_ptr, query.no_destination);
out = std::make_shared<MaterializingBlockOutputStream>(out);
out = std::make_shared<MaterializingBlockOutputStream>(out, table->getSampleBlock());
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, required_columns, table->column_defaults, context, static_cast<bool>(context.getSettingsRef().strict_insert_defaults));
if (!allow_materialized)
out = std::make_shared<ProhibitColumnsBlockOutputStream>(out, table->materialized_columns);
out, getSampleBlock(query, table), required_columns, table->column_defaults, context,
static_cast<bool>(context.getSettingsRef().strict_insert_defaults));
out = std::make_shared<SquashingBlockOutputStream>(
out, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
@ -130,27 +118,34 @@ BlockIO InterpreterInsertQuery::execute()
out = std::move(out_wrapper);
BlockIO res;
res.out_sample = getSampleBlock();
res.out = std::move(out);
/// What type of query: INSERT or INSERT SELECT?
if (!query.select)
{
res.out = out;
}
else
if (query.select)
{
InterpreterSelectQuery interpreter_select{query.select, context};
res.in = interpreter_select.execute().in;
res.in = std::make_shared<NullableAdapterBlockInputStream>(res.in, res.in->getHeader(), res.out_sample);
res.in = std::make_shared<CastTypeBlockInputStream>(context, res.in, res.out_sample);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out);
res.in = std::make_shared<NullableAdapterBlockInputStream>(res.in, res.in->getHeader(), res.out->getHeader());
res.in = std::make_shared<CastTypeBlockInputStream>(context, res.in, res.out->getHeader());
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, res.out);
res.out = nullptr;
if (!allow_materialized)
{
Block in_header = res.in->getHeader();
for (const auto & name_type : table->materialized_columns)
if (in_header.has(name_type.name))
throw Exception("Cannot insert column " + name_type.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
}
}
return res;
}
void InterpreterInsertQuery::checkAccess(const ASTInsertQuery & query)
{
const Settings & settings = context.getSettingsRef();
@ -163,4 +158,5 @@ void InterpreterInsertQuery::checkAccess(const ASTInsertQuery & query)
throw Exception("Cannot insert into table in readonly mode", ErrorCodes::READONLY);
}
}

View File

@ -25,14 +25,8 @@ public:
BlockIO execute() override;
private:
/// Cache storage to avoid double table function call.
StoragePtr cached_table;
StoragePtr loadTable();
StoragePtr getTable();
Block getSampleBlock();
StoragePtr getTable(const ASTInsertQuery & query);
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table);
void checkAccess(const ASTInsertQuery & query);
ASTPtr query_ptr;

View File

@ -21,6 +21,7 @@
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
@ -63,13 +64,14 @@ namespace ErrorCodes
extern const int ILLEGAL_FINAL;
extern const int ILLEGAL_PREWHERE;
extern const int TOO_MUCH_COLUMNS;
extern const int LOGICAL_ERROR;
}
InterpreterSelectQuery::~InterpreterSelectQuery() = default;
void InterpreterSelectQuery::init(const BlockInputStreamPtr & input, const Names & required_column_names)
void InterpreterSelectQuery::init(const Names & required_column_names)
{
ProfileEvents::increment(ProfileEvents::SelectQuery);
@ -95,22 +97,21 @@ void InterpreterSelectQuery::init(const BlockInputStreamPtr & input, const Names
ASTSelectQuery & head_query = static_cast<ASTSelectQuery &>(*head);
tail = head_query.next_union_all;
interpreter->next_select_in_union_all =
std::make_unique<InterpreterSelectQuery>(head, context, to_stage, subquery_depth);
interpreter->next_select_in_union_all = std::make_unique<InterpreterSelectQuery>(head, context, to_stage, subquery_depth);
interpreter = interpreter->next_select_in_union_all.get();
}
}
if (is_first_select_inside_union_all && hasAsterisk())
{
basicInit(input);
basicInit();
// We execute this code here, because otherwise the following kind of query would not work
// SELECT X FROM (SELECT * FROM (SELECT 1 AS X, 2 AS Y) UNION ALL SELECT 3, 4)
// because the asterisk is replaced with columns only when query_analyzer objects are created in basicInit().
renameColumns();
if (!required_column_names.empty() && (table_column_names.size() != required_column_names.size()))
if (!required_column_names.empty() && (source_header.columns() != required_column_names.size()))
{
rewriteExpressionList(required_column_names);
/// Now there is obsolete information to execute the query. We update this information.
@ -128,12 +129,12 @@ void InterpreterSelectQuery::init(const BlockInputStreamPtr & input, const Names
{
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
p->query_analyzer = std::make_unique<ExpressionAnalyzer>(
p->query_ptr, p->context, p->storage, p->table_column_names, p->subquery_depth,
p->query_ptr, p->context, p->storage, p->source_header.getNamesAndTypesList(), p->subquery_depth,
false, p->query_analyzer->getSubqueriesForSets());
}
}
basicInit(input);
basicInit();
}
}
@ -146,13 +147,12 @@ bool InterpreterSelectQuery::hasAggregation(const ASTSelectQuery & query_ptr)
return false;
}
void InterpreterSelectQuery::basicInit(const BlockInputStreamPtr & input)
void InterpreterSelectQuery::basicInit()
{
/// Read from prepared input.
if (input)
{
if (table_column_names.empty())
table_column_names = input->getHeader().getNamesAndTypesList();
source_header = input->getHeader();
}
else
{
@ -161,8 +161,7 @@ void InterpreterSelectQuery::basicInit(const BlockInputStreamPtr & input)
/// Read from subquery.
if (table_expression && typeid_cast<const ASTSelectQuery *>(table_expression.get()))
{
if (table_column_names.empty())
table_column_names = InterpreterSelectQuery::getSampleBlock(table_expression, context).getNamesAndTypesList();
source_header = InterpreterSelectQuery::getSampleBlock(table_expression, context);
}
else
{
@ -187,15 +186,14 @@ void InterpreterSelectQuery::basicInit(const BlockInputStreamPtr & input)
}
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
if (table_column_names.empty())
table_column_names = storage->getColumnsListNonMaterialized();
source_header = storage->getSampleBlockNonMaterialized();
}
}
if (table_column_names.empty())
if (!source_header)
throw Exception("There are no available columns", ErrorCodes::THERE_IS_NO_COLUMN);
query_analyzer = std::make_unique<ExpressionAnalyzer>(query_ptr, context, storage, table_column_names, subquery_depth, !only_analyze);
query_analyzer = std::make_unique<ExpressionAnalyzer>(query_ptr, context, storage, source_header.getNamesAndTypesList(), subquery_depth, !only_analyze);
if (query.sample_size() && (input || !storage || !storage->supportsSampling()))
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
@ -210,46 +208,46 @@ void InterpreterSelectQuery::basicInit(const BlockInputStreamPtr & input)
for (const auto & it : query_analyzer->getExternalTables())
if (!context.tryGetExternalTable(it.first))
context.addExternalTable(it.first, it.second);
if (input)
streams.push_back(input);
if (is_first_select_inside_union_all)
{
/// We check that the results of all SELECT queries are compatible.
Block first = getSampleBlock();
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
{
Block current = p->getSampleBlock();
if (!blocksHaveEqualStructure(first, current))
throw Exception("Result structures mismatch in the SELECT queries of the UNION ALL chain. Found result structure:\n\n" + current.dumpStructure()
+ "\n\nwhile expecting:\n\n" + first.dumpStructure() + "\n\ninstead",
ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
}
}
}
void InterpreterSelectQuery::initQueryAnalyzer()
{
query_analyzer = std::make_unique<ExpressionAnalyzer>(query_ptr, context, storage, table_column_names, subquery_depth, !only_analyze);
query_analyzer = std::make_unique<ExpressionAnalyzer>(query_ptr, context, storage, source_header.getNamesAndTypesList(), subquery_depth, !only_analyze);
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
p->query_analyzer = std::make_unique<ExpressionAnalyzer>(p->query_ptr, p->context, p->storage, p->table_column_names, p->subquery_depth, !only_analyze);
p->query_analyzer = std::make_unique<ExpressionAnalyzer>(p->query_ptr, p->context, p->storage, p->source_header.getNamesAndTypesList(), p->subquery_depth, !only_analyze);
}
InterpreterSelectQuery::InterpreterSelectQuery(const ASTPtr & query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_, BlockInputStreamPtr input)
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_,
const BlockInputStreamPtr & input)
: InterpreterSelectQuery(query_ptr_, context_, {}, to_stage_, subquery_depth_, input)
{
}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const Names & required_column_names_,
QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_,
const BlockInputStreamPtr & input)
: query_ptr(query_ptr_)
, query(typeid_cast<ASTSelectQuery &>(*query_ptr))
, context(context_)
, to_stage(to_stage_)
, subquery_depth(subquery_depth_)
, is_first_select_inside_union_all(query.isUnionAllHead())
, input(input)
, log(&Logger::get("InterpreterSelectQuery"))
{
init(input);
init(required_column_names_);
}
InterpreterSelectQuery::InterpreterSelectQuery(OnlyAnalyzeTag, const ASTPtr & query_ptr_, const Context & context_)
: query_ptr(query_ptr_)
, query(typeid_cast<ASTSelectQuery &>(*query_ptr))
@ -262,27 +260,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(OnlyAnalyzeTag, const ASTPtr & qu
init({});
}
InterpreterSelectQuery::InterpreterSelectQuery(const ASTPtr & query_ptr_, const Context & context_,
const Names & required_column_names_,
QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input)
: InterpreterSelectQuery(query_ptr_, context_, required_column_names_, {}, to_stage_, subquery_depth_, input)
{
}
InterpreterSelectQuery::InterpreterSelectQuery(const ASTPtr & query_ptr_, const Context & context_,
const Names & required_column_names_,
const NamesAndTypesList & table_column_names_, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input)
: query_ptr(query_ptr_)
, query(typeid_cast<ASTSelectQuery &>(*query_ptr))
, context(context_)
, to_stage(to_stage_)
, subquery_depth(subquery_depth_)
, table_column_names(table_column_names_)
, is_first_select_inside_union_all(query.isUnionAllHead())
, log(&Logger::get("InterpreterSelectQuery"))
{
init(input, required_column_names_);
}
bool InterpreterSelectQuery::hasAsterisk() const
{
@ -290,13 +267,9 @@ bool InterpreterSelectQuery::hasAsterisk() const
return true;
if (is_first_select_inside_union_all)
{
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
{
if (p->query.hasAsterisk())
return true;
}
}
return false;
}
@ -304,10 +277,8 @@ bool InterpreterSelectQuery::hasAsterisk() const
void InterpreterSelectQuery::renameColumns()
{
if (is_first_select_inside_union_all)
{
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
p->query.renameColumns(query);
}
}
void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column_names)
@ -316,21 +287,15 @@ void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column
return;
if (is_first_select_inside_union_all)
{
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
{
if (p->query.distinct)
return;
}
}
query.rewriteSelectExpressionList(required_column_names);
if (is_first_select_inside_union_all)
{
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
p->query.rewriteSelectExpressionList(required_column_names);
}
}
void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, String & table_name)
@ -361,28 +326,12 @@ void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, St
}
DataTypes InterpreterSelectQuery::getReturnTypes()
{
DataTypes res;
const NamesAndTypesList & columns = query_analyzer->getSelectSampleBlock().getNamesAndTypesList();
for (auto & column : columns)
res.push_back(column.type);
return res;
}
Block InterpreterSelectQuery::getSampleBlock()
{
Block block = query_analyzer->getSelectSampleBlock();
/// create non-zero columns so that SampleBlock can be
/// written (read) with BlockOut(In)putStreams
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & col = block.safeGetByPosition(i);
col.column = col.type->createColumn();
}
return block;
Pipeline pipeline;
executeWithoutUnionImpl(pipeline, std::make_shared<OneBlockInputStream>(source_header));
auto res = pipeline.firstStream()->getHeader();
return res;
}
@ -394,12 +343,12 @@ Block InterpreterSelectQuery::getSampleBlock(const ASTPtr & query_ptr_, const Co
BlockIO InterpreterSelectQuery::execute()
{
(void) executeWithoutUnion();
executeUnion();
Pipeline pipeline;
executeWithoutUnionImpl(pipeline, input);
executeUnion(pipeline);
/// Constraints on the result, the quota on the result, and also callback for progress.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(streams[0].get()))
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(pipeline.firstStream().get()))
{
/// Constraints apply only to the final result.
if (to_stage == QueryProcessingStage::Complete)
@ -418,34 +367,42 @@ BlockIO InterpreterSelectQuery::execute()
}
BlockIO res;
res.in = streams[0];
res.in = pipeline.firstStream();
return res;
}
const BlockInputStreams & InterpreterSelectQuery::executeWithoutUnion()
BlockInputStreams InterpreterSelectQuery::executeWithoutUnion()
{
Pipeline pipeline;
executeWithoutUnionImpl(pipeline, input);
return pipeline.streams;
}
void InterpreterSelectQuery::executeWithoutUnionImpl(Pipeline & pipeline, const BlockInputStreamPtr & input)
{
if (input)
pipeline.streams.push_back(input);
if (is_first_select_inside_union_all)
{
executeSingleQuery();
executeSingleQuery(pipeline);
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
{
p->executeSingleQuery();
const auto & others = p->streams;
streams.insert(streams.end(), others.begin(), others.end());
Pipeline other_pipeline;
p->executeSingleQuery(other_pipeline);
pipeline.streams.insert(pipeline.streams.end(), other_pipeline.streams.begin(), other_pipeline.streams.end());
}
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<MaterializingBlockInputStream>(stream);
});
}
else
executeSingleQuery();
return streams;
executeSingleQuery(pipeline);
}
void InterpreterSelectQuery::executeSingleQuery()
void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline)
{
/** Streams of data. When the query is executed in parallel, we have several data streams.
* If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then
@ -462,7 +419,7 @@ void InterpreterSelectQuery::executeSingleQuery()
union_within_single_query = false;
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns();
QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
@ -510,10 +467,6 @@ void InterpreterSelectQuery::executeSingleQuery()
has_join = true;
before_join = chain.getLastActions();
chain.addStep();
const ASTTableJoin & join = static_cast<const ASTTableJoin &>(*query.join()->table_join);
if (join.kind == ASTTableJoin::Kind::Full || join.kind == ASTTableJoin::Kind::Right)
stream_with_non_joined_data = before_join->createStreamWithNonJoinedDataIfFullOrRightJoin(settings.max_block_size);
}
if (query_analyzer->appendWhere(chain, !first_stage))
@ -579,18 +532,25 @@ void InterpreterSelectQuery::executeSingleQuery()
if (first_stage)
{
if (has_join)
for (auto & stream : streams) /// Applies to all sources except stream_with_non_joined_data.
{
const ASTTableJoin & join = static_cast<const ASTTableJoin &>(*query.join()->table_join);
if (join.kind == ASTTableJoin::Kind::Full || join.kind == ASTTableJoin::Kind::Right)
pipeline.stream_with_non_joined_data = before_join->createStreamWithNonJoinedDataIfFullOrRightJoin(
pipeline.firstStream()->getHeader(), settings.max_block_size);
for (auto & stream : pipeline.streams) /// Applies to all sources except stream_with_non_joined_data.
stream = std::make_shared<ExpressionBlockInputStream>(stream, before_join);
}
if (has_where)
executeWhere(before_where);
executeWhere(pipeline, before_where);
if (need_aggregate)
executeAggregation(before_aggregation, aggregate_overflow_row, aggregate_final);
executeAggregation(pipeline, before_aggregation, aggregate_overflow_row, aggregate_final);
else
{
executeExpression(before_order_and_select);
executeDistinct(true, selected_columns);
executeExpression(pipeline, before_order_and_select);
executeDistinct(pipeline, true, selected_columns);
}
/** For distributed query processing,
@ -601,13 +561,13 @@ void InterpreterSelectQuery::executeSingleQuery()
if (!second_stage && !need_aggregate && !has_having)
{
if (has_order_by)
executeOrder();
executeOrder(pipeline);
if (has_order_by && query.limit_length)
executeDistinct(false, selected_columns);
executeDistinct(pipeline, false, selected_columns);
if (query.limit_length)
executePreLimit();
executePreLimit(pipeline);
}
}
@ -619,24 +579,24 @@ void InterpreterSelectQuery::executeSingleQuery()
{
/// If you need to combine aggregated results from multiple servers
if (!first_stage)
executeMergeAggregated(aggregate_overflow_row, aggregate_final);
executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final);
if (!aggregate_final)
executeTotalsAndHaving(has_having, before_having, aggregate_overflow_row);
executeTotalsAndHaving(pipeline, has_having, before_having, aggregate_overflow_row);
else if (has_having)
executeHaving(before_having);
executeHaving(pipeline, before_having);
executeExpression(before_order_and_select);
executeDistinct(true, selected_columns);
executeExpression(pipeline, before_order_and_select);
executeDistinct(pipeline, true, selected_columns);
need_second_distinct_pass = query.distinct && hasMoreThanOneStream();
need_second_distinct_pass = query.distinct && pipeline.hasMoreThanOneStream();
}
else
{
need_second_distinct_pass = query.distinct && hasMoreThanOneStream();
need_second_distinct_pass = query.distinct && pipeline.hasMoreThanOneStream();
if (query.group_by_with_totals && !aggregate_final)
executeTotalsAndHaving(false, nullptr, aggregate_overflow_row);
executeTotalsAndHaving(pipeline, false, nullptr, aggregate_overflow_row);
}
if (has_order_by)
@ -646,17 +606,17 @@ void InterpreterSelectQuery::executeSingleQuery()
* - therefore, we merge the sorted streams from remote servers.
*/
if (!first_stage && !need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted();
executeMergeSorted(pipeline);
else /// Otherwise, just sort.
executeOrder();
executeOrder(pipeline);
}
executeProjection(final_projection);
executeProjection(pipeline, final_projection);
/// At this stage, we can calculate the minimums and maximums, if necessary.
if (settings.extremes)
{
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
p_stream->enableExtremes();
@ -666,36 +626,36 @@ void InterpreterSelectQuery::executeSingleQuery()
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
* limiting the number of entries in each up to `offset + limit`.
*/
if (query.limit_length && hasMoreThanOneStream() && !query.distinct && !query.limit_by_expression_list)
executePreLimit();
if (query.limit_length && pipeline.hasMoreThanOneStream() && !query.distinct && !query.limit_by_expression_list)
executePreLimit(pipeline);
if (stream_with_non_joined_data || need_second_distinct_pass)
if (pipeline.stream_with_non_joined_data || need_second_distinct_pass)
union_within_single_query = true;
/// To execute LIMIT BY we should merge all streams together.
if (query.limit_by_expression_list && hasMoreThanOneStream())
if (query.limit_by_expression_list && pipeline.hasMoreThanOneStream())
union_within_single_query = true;
if (union_within_single_query)
executeUnion();
executeUnion(pipeline);
if (streams.size() == 1)
if (pipeline.streams.size() == 1)
{
/** If there was more than one stream,
* then DISTINCT needs to be performed once again after merging all streams.
*/
if (need_second_distinct_pass)
executeDistinct(false, Names());
executeDistinct(pipeline, false, Names());
executeLimitBy();
executeLimit();
executeLimitBy(pipeline);
executeLimit(pipeline);
}
}
}
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(subqueries_for_sets);
executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets);
}
@ -711,7 +671,7 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz
}
}
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline & pipeline)
{
/// The subquery interpreter, if the subquery
std::optional<InterpreterSelectQuery> interpreter_subquery;
@ -749,7 +709,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
required_columns_expr_list->children.emplace_back(std::make_shared<ASTIdentifier>(StringRange(), column));
}
alias_actions = ExpressionAnalyzer{required_columns_expr_list, context, storage, table_column_names}.getActions(true);
alias_actions = ExpressionAnalyzer{required_columns_expr_list, context, storage, source_header.getNamesAndTypesList()}.getActions(true);
/// The set of required columns could be added as a result of adding an action to calculate ALIAS.
required_columns = alias_actions->getRequiredColumns();
@ -830,9 +790,20 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
query_analyzer->makeSetsForIndex();
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery?
if (!interpreter_subquery)
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input?
if (!pipeline.streams.empty())
{
/// Prepared input.
}
else if (interpreter_subquery)
{
/// Subquery.
interpreter_subquery->executeWithoutUnionImpl(pipeline, {});
}
else if (storage)
{
/// Table.
if (max_streams == 0)
throw Exception("Logical error: zero number of streams requested", ErrorCodes::LOGICAL_ERROR);
@ -860,76 +831,73 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
}
/// If there was no already prepared input.
if (streams.empty())
streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
if (pipeline.streams.empty())
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
if (streams.empty())
streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
if (pipeline.streams.empty())
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
if (alias_actions)
{
/// Wrap each stream returned from the table to calculate and add ALIAS columns
transformStreams([&] (auto & stream)
pipeline.transform([&] (auto & stream)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, alias_actions);
});
}
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream->addTableLock(table_lock);
});
/** Set the limits and quota for reading data, the speed and time of the query.
* Such restrictions are checked on the initiating server of the request, and not on remote servers.
* Because the initiating server has a summary of the execution of the request on all servers.
*/
if (to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.max_rows_to_read = settings.limits.max_rows_to_read;
limits.max_bytes_to_read = settings.limits.max_bytes_to_read;
limits.read_overflow_mode = settings.limits.read_overflow_mode;
limits.max_execution_time = settings.limits.max_execution_time;
limits.timeout_overflow_mode = settings.limits.timeout_overflow_mode;
limits.min_execution_speed = settings.limits.min_execution_speed;
limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed;
QuotaForIntervals & quota = context.getQuota();
pipeline.transform([&](auto & stream)
{
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
{
p_stream->setLimits(limits);
p_stream->setQuota(quota);
}
});
}
}
else
{
const auto & subquery_streams = interpreter_subquery->executeWithoutUnion();
streams.insert(streams.end(), subquery_streams.begin(), subquery_streams.end());
}
/** Set the limits and quota for reading data, the speed and time of the query.
* Such restrictions are checked on the initiating server of the request, and not on remote servers.
* Because the initiating server has a summary of the execution of the request on all servers.
*/
if (storage && to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.max_rows_to_read = settings.limits.max_rows_to_read;
limits.max_bytes_to_read = settings.limits.max_bytes_to_read;
limits.read_overflow_mode = settings.limits.read_overflow_mode;
limits.max_execution_time = settings.limits.max_execution_time;
limits.timeout_overflow_mode = settings.limits.timeout_overflow_mode;
limits.min_execution_speed = settings.limits.min_execution_speed;
limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed;
QuotaForIntervals & quota = context.getQuota();
transformStreams([&](auto & stream)
{
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
{
p_stream->setLimits(limits);
p_stream->setQuota(quota);
}
});
}
throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);
return from_stage;
}
void InterpreterSelectQuery::executeWhere(const ExpressionActionsPtr & expression)
void InterpreterSelectQuery::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression)
{
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FilterBlockInputStream>(stream, expression, query.where_expression->getColumnName());
});
}
void InterpreterSelectQuery::executeAggregation(const ExpressionActionsPtr & expression, bool overflow_row, bool final)
void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
{
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, expression);
});
@ -938,7 +906,7 @@ void InterpreterSelectQuery::executeAggregation(const ExpressionActionsPtr & exp
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header = streams[0]->getHeader();
Block header = pipeline.firstStream()->getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header.getPositionByName(name));
@ -953,7 +921,7 @@ void InterpreterSelectQuery::executeAggregation(const ExpressionActionsPtr & exp
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
bool allow_to_use_two_level_group_by = streams.size() > 1 || settings.limits.max_bytes_before_external_group_by != 0;
bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.limits.max_bytes_before_external_group_by != 0;
Aggregator::Params params(header, keys, aggregates,
overflow_row, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode,
@ -964,43 +932,43 @@ void InterpreterSelectQuery::executeAggregation(const ExpressionActionsPtr & exp
context.getTemporaryPath());
/// If there are several sources, then we perform parallel aggregation
if (streams.size() > 1)
if (pipeline.streams.size() > 1)
{
streams[0] = std::make_shared<ParallelAggregatingBlockInputStream>(
streams, stream_with_non_joined_data, params, final,
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams, pipeline.stream_with_non_joined_data, params, final,
max_streams,
settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads));
stream_with_non_joined_data = nullptr;
streams.resize(1);
pipeline.stream_with_non_joined_data = nullptr;
pipeline.streams.resize(1);
}
else
{
BlockInputStreams inputs;
if (!streams.empty())
inputs.push_back(streams[0]);
if (!pipeline.streams.empty())
inputs.push_back(pipeline.firstStream());
else
streams.resize(1);
pipeline.streams.resize(1);
if (stream_with_non_joined_data)
inputs.push_back(stream_with_non_joined_data);
if (pipeline.stream_with_non_joined_data)
inputs.push_back(pipeline.stream_with_non_joined_data);
streams[0] = std::make_shared<AggregatingBlockInputStream>(std::make_shared<ConcatBlockInputStream>(inputs), params, final);
pipeline.firstStream() = std::make_shared<AggregatingBlockInputStream>(std::make_shared<ConcatBlockInputStream>(inputs), params, final);
stream_with_non_joined_data = nullptr;
pipeline.stream_with_non_joined_data = nullptr;
}
}
void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool final)
void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final)
{
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header = streams[0]->getHeader();
Block header = pipeline.firstStream()->getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
@ -1028,48 +996,48 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
if (!settings.distributed_aggregation_memory_efficient)
{
/// We union several sources into one, parallelizing the work.
executeUnion();
executeUnion(pipeline);
/// Now merge the aggregated blocks
streams[0] = std::make_shared<MergingAggregatedBlockInputStream>(streams[0], params, final, settings.max_threads);
pipeline.firstStream() = std::make_shared<MergingAggregatedBlockInputStream>(pipeline.firstStream(), params, final, settings.max_threads);
}
else
{
streams[0] = std::make_shared<MergingAggregatedMemoryEfficientBlockInputStream>(streams, params, final,
pipeline.firstStream() = std::make_shared<MergingAggregatedMemoryEfficientBlockInputStream>(pipeline.streams, params, final,
max_streams,
settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads));
streams.resize(1);
pipeline.streams.resize(1);
}
}
void InterpreterSelectQuery::executeHaving(const ExpressionActionsPtr & expression)
void InterpreterSelectQuery::executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression)
{
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FilterBlockInputStream>(stream, expression, query.having_expression->getColumnName());
});
}
void InterpreterSelectQuery::executeTotalsAndHaving(bool has_having, const ExpressionActionsPtr & expression, bool overflow_row)
void InterpreterSelectQuery::executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row)
{
executeUnion();
executeUnion(pipeline);
const Settings & settings = context.getSettingsRef();
streams[0] = std::make_shared<TotalsHavingBlockInputStream>(
streams[0], overflow_row, expression,
pipeline.firstStream() = std::make_shared<TotalsHavingBlockInputStream>(
pipeline.firstStream(), overflow_row, expression,
has_having ? query.having_expression->getColumnName() : "", settings.totals_mode, settings.totals_auto_threshold);
}
void InterpreterSelectQuery::executeExpression(const ExpressionActionsPtr & expression)
void InterpreterSelectQuery::executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression)
{
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, expression);
});
@ -1111,14 +1079,14 @@ static size_t getLimitForSorting(ASTSelectQuery & query)
}
void InterpreterSelectQuery::executeOrder()
void InterpreterSelectQuery::executeOrder(Pipeline & pipeline)
{
SortDescription order_descr = getSortDescription(query);
size_t limit = getLimitForSorting(query);
const Settings & settings = context.getSettingsRef();
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
auto sorting_stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, limit);
@ -1134,16 +1102,16 @@ void InterpreterSelectQuery::executeOrder()
});
/// If there are several streams, we merge them into one
executeUnion();
executeUnion(pipeline);
/// Merge the sorted blocks.
streams[0] = std::make_shared<MergeSortingBlockInputStream>(
streams[0], order_descr, settings.max_block_size, limit,
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
settings.limits.max_bytes_before_external_sort, context.getTemporaryPath());
}
void InterpreterSelectQuery::executeMergeSorted()
void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
{
SortDescription order_descr = getSortDescription(query);
size_t limit = getLimitForSorting(query);
@ -1151,33 +1119,33 @@ void InterpreterSelectQuery::executeMergeSorted()
const Settings & settings = context.getSettingsRef();
/// If there are several streams, then we merge them into one
if (hasMoreThanOneStream())
if (pipeline.hasMoreThanOneStream())
{
/** MergingSortedBlockInputStream reads the sources sequentially.
* To make the data on the remote servers prepared in parallel, we wrap it in AsynchronousBlockInputStream.
*/
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<AsynchronousBlockInputStream>(stream);
});
/// Merge the sorted sources into one sorted source.
streams[0] = std::make_shared<MergingSortedBlockInputStream>(streams, order_descr, settings.max_block_size, limit);
streams.resize(1);
pipeline.firstStream() = std::make_shared<MergingSortedBlockInputStream>(pipeline.streams, order_descr, settings.max_block_size, limit);
pipeline.streams.resize(1);
}
}
void InterpreterSelectQuery::executeProjection(const ExpressionActionsPtr & expression)
void InterpreterSelectQuery::executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression)
{
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, expression);
});
}
void InterpreterSelectQuery::executeDistinct(bool before_order, Names columns)
void InterpreterSelectQuery::executeDistinct(Pipeline & pipeline, bool before_order, Names columns)
{
if (query.distinct)
{
@ -1193,7 +1161,7 @@ void InterpreterSelectQuery::executeDistinct(bool before_order, Names columns)
if (!query.order_expression_list || !before_order)
limit_for_distinct = limit_length + limit_offset;
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
if (stream->isGroupedOutput())
stream = std::make_shared<DistinctSortedBlockInputStream>(stream, settings.limits, limit_for_distinct, columns);
@ -1201,33 +1169,33 @@ void InterpreterSelectQuery::executeDistinct(bool before_order, Names columns)
stream = std::make_shared<DistinctBlockInputStream>(stream, settings.limits, limit_for_distinct, columns);
});
if (hasMoreThanOneStream())
if (pipeline.hasMoreThanOneStream())
union_within_single_query = true;
}
}
void InterpreterSelectQuery::executeUnion()
void InterpreterSelectQuery::executeUnion(Pipeline & pipeline)
{
/// If there are still several streams, then we combine them into one
if (hasMoreThanOneStream())
if (pipeline.hasMoreThanOneStream())
{
streams[0] = std::make_shared<UnionBlockInputStream<>>(streams, stream_with_non_joined_data, max_streams);
stream_with_non_joined_data = nullptr;
streams.resize(1);
pipeline.firstStream() = std::make_shared<UnionBlockInputStream<>>(pipeline.streams, pipeline.stream_with_non_joined_data, max_streams);
pipeline.stream_with_non_joined_data = nullptr;
pipeline.streams.resize(1);
union_within_single_query = false;
}
else if (stream_with_non_joined_data)
else if (pipeline.stream_with_non_joined_data)
{
streams.push_back(stream_with_non_joined_data);
stream_with_non_joined_data = nullptr;
pipeline.streams.push_back(pipeline.stream_with_non_joined_data);
pipeline.stream_with_non_joined_data = nullptr;
union_within_single_query = false;
}
}
/// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined.
void InterpreterSelectQuery::executePreLimit()
void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
{
size_t limit_length = 0;
size_t limit_offset = 0;
@ -1236,18 +1204,18 @@ void InterpreterSelectQuery::executePreLimit()
/// If there is LIMIT
if (query.limit_length)
{
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, 0, false);
});
if (hasMoreThanOneStream())
if (pipeline.hasMoreThanOneStream())
union_within_single_query = true;
}
}
void InterpreterSelectQuery::executeLimitBy()
void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline)
{
if (!query.limit_by_value || !query.limit_by_expression_list)
return;
@ -1260,7 +1228,7 @@ void InterpreterSelectQuery::executeLimitBy()
columns.emplace_back(elem->getAliasOrColumnName());
}
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<LimitByBlockInputStream>(
stream, value, columns
@ -1269,7 +1237,7 @@ void InterpreterSelectQuery::executeLimitBy()
}
void InterpreterSelectQuery::executeLimit()
void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
{
size_t limit_length = 0;
size_t limit_offset = 0;
@ -1319,7 +1287,7 @@ void InterpreterSelectQuery::executeLimit()
}
}
transformStreams([&](auto & stream)
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, always_read_till_end);
});
@ -1327,31 +1295,16 @@ void InterpreterSelectQuery::executeLimit()
}
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(SubqueriesForSets & subqueries_for_sets)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline, SubqueriesForSets & subqueries_for_sets)
{
const Settings & settings = context.getSettingsRef();
executeUnion();
streams[0] = std::make_shared<CreatingSetsBlockInputStream>(streams[0], subqueries_for_sets, settings.limits);
}
template <typename Transform>
void InterpreterSelectQuery::transformStreams(Transform && transform)
{
for (auto & stream : streams)
transform(stream);
if (stream_with_non_joined_data)
transform(stream_with_non_joined_data);
}
bool InterpreterSelectQuery::hasMoreThanOneStream() const
{
return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1;
executeUnion(pipeline);
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(pipeline.firstStream(), subqueries_for_sets, settings.limits);
}
/// TODO This is trash.
void InterpreterSelectQuery::ignoreWithTotals()
{
query.group_by_with_totals = false;

View File

@ -50,7 +50,7 @@ public:
const Context & context_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
const BlockInputStreamPtr & input = nullptr);
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
@ -58,16 +58,7 @@ public:
const Names & required_column_names,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const Names & required_column_names,
const NamesAndTypesList & table_column_names_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
const BlockInputStreamPtr & input = nullptr);
~InterpreterSelectQuery();
@ -78,10 +69,8 @@ public:
/** Execute the query without union of threads, if it is possible.
*/
const BlockInputStreams & executeWithoutUnion();
BlockInputStreams executeWithoutUnion();
/// TODO It's confusing that these methods return result structure for the case of QueryProcessingStage::Complete regardless to the actual 'to_stage'.
DataTypes getReturnTypes();
Block getSampleBlock();
static Block getSampleBlock(
@ -89,23 +78,55 @@ public:
const Context & context_);
private:
/**
* - Optimization if an object is created only to call getSampleBlock(): consider only the first SELECT of the UNION ALL chain, because
* the first SELECT is sufficient to determine the required columns.
*/
struct Pipeline
{
/** Streams of data.
* The source data streams are produced in the executeFetchColumns function.
* Then they are converted (wrapped in other streams) using the `execute*` functions,
* to get the whole pipeline running the query.
*/
BlockInputStreams streams;
/** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows.
* It has a special meaning, since reading from it should be done after reading from the main streams.
* It is joined to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream.
*/
BlockInputStreamPtr stream_with_non_joined_data;
BlockInputStreamPtr & firstStream() { return streams.at(0); }
template <typename Transform>
void transform(Transform && transform)
{
for (auto & stream : streams)
transform(stream);
if (stream_with_non_joined_data)
transform(stream_with_non_joined_data);
}
bool hasMoreThanOneStream() const
{
return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1;
}
};
/** - Optimization if an object is created only to call getSampleBlock(): consider only the first SELECT of the UNION ALL chain, because
* the first SELECT is sufficient to determine the required columns.
*/
struct OnlyAnalyzeTag {};
InterpreterSelectQuery(
OnlyAnalyzeTag,
const ASTPtr & query_ptr_,
const Context & context_);
void init(const BlockInputStreamPtr & input, const Names & required_column_names = Names{});
void basicInit(const BlockInputStreamPtr & input);
void init(const Names & required_column_names);
void basicInit();
void initQueryAnalyzer();
bool hasAggregation(const ASTSelectQuery & query_ptr);
/// Execute one SELECT query from the UNION ALL chain.
void executeSingleQuery();
void executeSingleQuery(Pipeline & pipeline);
/** Leave only the necessary columns of the SELECT section in each query of the UNION ALL chain.
* However, if you use at least one DISTINCT in the chain, then all the columns are considered necessary,
@ -136,30 +157,24 @@ private:
/// Different stages of query execution.
/// Fetch data from the table. Returns the stage to which the query was processed in Storage.
QueryProcessingStage::Enum executeFetchColumns();
QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline);
void executeWhere(const ExpressionActionsPtr & expression);
void executeAggregation(const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeMergeAggregated(bool overflow_row, bool final);
void executeTotalsAndHaving(bool has_having, const ExpressionActionsPtr & expression, bool overflow_row);
void executeHaving(const ExpressionActionsPtr & expression);
void executeExpression(const ExpressionActionsPtr & expression);
void executeOrder();
void executeMergeSorted();
void executePreLimit();
void executeUnion();
void executeLimitBy();
void executeLimit();
void executeProjection(const ExpressionActionsPtr & expression);
void executeDistinct(bool before_order, Names columns);
void executeSubqueriesInSetsAndJoins(std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
template <typename Transform>
void transformStreams(Transform && transform);
bool hasNoData() const;
bool hasMoreThanOneStream() const;
void executeWithoutUnionImpl(Pipeline & pipeline, const BlockInputStreamPtr & input);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row);
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(Pipeline & pipeline);
void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline);
void executeLimitBy(Pipeline & pipeline);
void executeLimit(Pipeline & pipeline);
void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeDistinct(Pipeline & pipeline, bool before_order, Names columns);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void ignoreWithTotals();
@ -177,24 +192,11 @@ private:
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
std::unique_ptr<ExpressionAnalyzer> query_analyzer;
NamesAndTypesList table_column_names;
Block source_header;
/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;
/** Streams of data.
* The source data streams are produced in the executeFetchColumns function.
* Then they are converted (wrapped in other streams) using the `execute*` functions,
* to get the whole pipeline running the query.
*/
BlockInputStreams streams;
/** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows.
* It has a special meaning, since reading from it should be done after reading from the main streams.
* It is joined to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream.
*/
BlockInputStreamPtr stream_with_non_joined_data;
/// Is it the first SELECT query of the UNION ALL chain?
bool is_first_select_inside_union_all;
@ -208,6 +210,9 @@ private:
StoragePtr storage;
TableStructureReadLockPtr table_lock;
/// Used when we read from prepared input, not table or subquery.
BlockInputStreamPtr input;
/// Do union of streams within a SELECT query?
bool union_within_single_query = false;

View File

@ -11,6 +11,8 @@
#include <Interpreters/NullableUtils.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <Core/ColumnNumbers.h>
#include <Common/typeid_cast.h>
@ -281,7 +283,7 @@ void Join::setSampleBlock(const Block & block)
/// Choose data structure to use for JOIN.
init(chooseMethod(key_columns, key_sizes));
sample_block_with_columns_to_add = block;
sample_block_with_columns_to_add = materializeBlock(block);
/// Move from `sample_block_with_columns_to_add` key columns to `sample_block_with_keys`, keeping the order.
size_t pos = 0;
@ -462,8 +464,8 @@ bool Join::insertFromBlock(const Block & block)
if (getFullness(kind))
{
/** Transfer the key columns to the beginning of the block.
* This is where NonJoinedBlockInputStream will wait for them.
/** Move the key columns to the beginning of the block.
* This is where NonJoinedBlockInputStream will expect.
*/
size_t key_num = 0;
for (const auto & name : key_names_right)
@ -990,7 +992,7 @@ public:
size_t num_columns_left = left_sample_block.columns() - num_keys;
size_t num_columns_right = parent.sample_block_with_columns_to_add.columns();
result_sample_block = left_sample_block;
result_sample_block = materializeBlock(left_sample_block);
/// Add columns from the right-side table to the block.
for (size_t i = 0; i < num_columns_right; ++i)
@ -1154,7 +1156,7 @@ private:
};
BlockInputStreamPtr Join::createStreamWithNonJoinedRows(Block & left_sample_block, size_t max_block_size) const
BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & left_sample_block, size_t max_block_size) const
{
return std::make_shared<NonJoinedBlockInputStream>(*this, left_sample_block, max_block_size);
}

View File

@ -253,7 +253,7 @@ public:
* Use only after all calls to joinBlock was done.
* left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
*/
BlockInputStreamPtr createStreamWithNonJoinedRows(Block & left_sample_block, size_t max_block_size) const;
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & left_sample_block, size_t max_block_size) const;
/// Number of keys in all built JOIN maps.
size_t getTotalRowCount() const;

View File

@ -540,7 +540,7 @@ MergeTreeSetIndex::MergeTreeSetIndex(const SetElements & set_elements, std::vect
* 1: the intersection of the set and the range is non-empty
* 2: the range contains elements not in the set
*/
BoolMask MergeTreeSetIndex::mayBeTrueInRange(const std::vector<Range> & key_ranges)
BoolMask MergeTreeSetIndex::mayBeTrueInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types)
{
std::vector<FieldWithInfinity> left_point;
std::vector<FieldWithInfinity> right_point;
@ -555,7 +555,7 @@ BoolMask MergeTreeSetIndex::mayBeTrueInRange(const std::vector<Range> & key_rang
std::optional<Range> new_range = PKCondition::applyMonotonicFunctionsChainToRange(
key_ranges[indexes_mapping[i].pk_index],
indexes_mapping[i].functions,
indexes_mapping[i].data_type);
data_types[indexes_mapping[i].pk_index]);
if (!new_range)
return {true, true};

View File

@ -179,12 +179,11 @@ public:
size_t tuple_index;
size_t pk_index;
std::vector<FunctionBasePtr> functions;
DataTypePtr data_type;
};
MergeTreeSetIndex(const SetElements & set_elements, std::vector<PKTuplePositionMapping> && indexes_mapping_);
BoolMask mayBeTrueInRange(const std::vector<Range> & key_ranges);
BoolMask mayBeTrueInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types);
private:
using OrderedTuples = std::vector<std::vector<FieldWithInfinity>>;
OrderedTuples ordered_set;

View File

@ -113,8 +113,6 @@ struct Settings
\
M(SettingDistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, "How are distributed subqueries performed inside IN or JOIN sections?") \
\
M(SettingGlobalSubqueriesMethod, global_subqueries_method, GlobalSubqueriesMethod::PUSH, "The method for executing GLOBAL subqueries.") \
\
M(SettingUInt64, max_concurrent_queries_for_user, 0, "The maximum number of concurrent requests per user.") \
\
M(SettingBool, insert_deduplicate, true, "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be preformed") \

View File

@ -672,73 +672,6 @@ struct SettingDistributedProductMode
}
};
/// Method for executing global distributed subqueries.
enum class GlobalSubqueriesMethod
{
PUSH = 0, /// Send the subquery data to all remote servers.
PULL = 1, /// Remote servers will download the subquery data from the initiating server.
};
struct SettingGlobalSubqueriesMethod
{
GlobalSubqueriesMethod value;
bool changed = false;
SettingGlobalSubqueriesMethod(GlobalSubqueriesMethod x = GlobalSubqueriesMethod::PUSH) : value(x) {}
operator GlobalSubqueriesMethod() const { return value; }
SettingGlobalSubqueriesMethod & operator= (GlobalSubqueriesMethod x) { set(x); return *this; }
static GlobalSubqueriesMethod getGlobalSubqueriesMethod(const String & s)
{
if (s == "push")
return GlobalSubqueriesMethod::PUSH;
if (s == "pull")
return GlobalSubqueriesMethod::PULL;
throw Exception("Unknown global subqueries execution method: '" + s + "', must be one of 'push', 'pull'",
ErrorCodes::UNKNOWN_GLOBAL_SUBQUERIES_METHOD);
}
String toString() const
{
const char * strings[] = { "push", "pull" };
if (value < GlobalSubqueriesMethod::PUSH || value > GlobalSubqueriesMethod::PULL)
throw Exception("Unknown global subqueries execution method", ErrorCodes::UNKNOWN_GLOBAL_SUBQUERIES_METHOD);
return strings[static_cast<size_t>(value)];
}
void set(GlobalSubqueriesMethod x)
{
value = x;
changed = true;
}
void set(const Field & x)
{
set(safeGet<const String &>(x));
}
void set(const String & x)
{
set(getGlobalSubqueriesMethod(x));
}
void set(ReadBuffer & buf)
{
String x;
readBinary(x, buf);
set(x);
}
void write(WriteBuffer & buf) const
{
writeBinary(toString(), buf);
}
};
struct SettingString
{

View File

@ -125,7 +125,7 @@ int main(int argc, char ** argv)
LimitBlockInputStream lis(is, 20, std::max(0, static_cast<int>(n) - 20));
WriteBufferFromOStream out_buf(std::cout);
RowOutputStreamPtr os_ = std::make_shared<TabSeparatedRowOutputStream>(out_buf, block);
BlockOutputStreamFromRowOutputStream os(os_);
BlockOutputStreamFromRowOutputStream os(os_, is->getHeader());
copyData(lis, os);
}

View File

@ -1758,7 +1758,7 @@ protected:
{
/// CREATE TABLE and DROP PARTITION return empty block
RemoteBlockInputStream stream(*connection, query, Block(), context, &current_settings);
NullBlockOutputStream output;
NullBlockOutputStream output(Block());
copyData(stream, output);
if (increment_and_check_exit())

View File

@ -288,7 +288,7 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
state.io.out->writePrefix();
/// Send block to the client - table structure.
Block block = state.io.out_sample;
Block block = state.io.out->getHeader();
sendData(block);
readData(global_settings);
@ -417,7 +417,7 @@ void TCPHandler::sendTotals()
if (totals)
{
initBlockOutput();
initBlockOutput(totals);
writeVarUInt(Protocol::Server::Totals, *out);
writeStringBinary("", *out);
@ -438,7 +438,7 @@ void TCPHandler::sendExtremes()
if (extremes)
{
initBlockOutput();
initBlockOutput(extremes);
writeVarUInt(Protocol::Server::Extremes, *out);
writeStringBinary("", *out);
@ -662,7 +662,7 @@ void TCPHandler::initBlockInput()
}
void TCPHandler::initBlockOutput()
void TCPHandler::initBlockOutput(const Block & block)
{
if (!state.block_out)
{
@ -674,7 +674,8 @@ void TCPHandler::initBlockOutput()
state.block_out = std::make_shared<NativeBlockOutputStream>(
*state.maybe_compressed_out,
client_revision);
client_revision,
block.cloneEmpty());
}
}
@ -715,7 +716,7 @@ bool TCPHandler::isQueryCancelled()
void TCPHandler::sendData(const Block & block)
{
initBlockOutput();
initBlockOutput(block);
writeVarUInt(Protocol::Server::Data, *out);
writeStringBinary("", *out);

View File

@ -140,7 +140,7 @@ private:
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();
void initBlockOutput();
void initBlockOutput(const Block & block);
bool isQueryCancelled();

View File

@ -33,6 +33,7 @@
#include <condition_variable>
#include <mutex>
namespace CurrentMetrics
{
extern const Metric DistributedSend;
@ -53,14 +54,20 @@ namespace ErrorCodes
}
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_)
: storage(storage), query_ast(query_ast), cluster(cluster_), settings(settings_), insert_sync(insert_sync_),
insert_timeout(insert_timeout_)
DistributedBlockOutputStream::DistributedBlockOutputStream(
StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_)
: storage(storage), query_ast(query_ast), cluster(cluster_), settings(settings_), insert_sync(insert_sync_), insert_timeout(insert_timeout_)
{
}
Block DistributedBlockOutputStream::getHeader() const
{
return storage.getSampleBlock();
}
void DistributedBlockOutputStream::writePrefix()
{
deadline = std::chrono::steady_clock::now() + std::chrono::seconds(insert_timeout);
@ -472,7 +479,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
WriteBufferFromFile out{block_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get()};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
writeStringBinary(query_string, out);

View File

@ -36,8 +36,8 @@ public:
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_);
Block getHeader() const override;
void write(const Block & block) override;
void writePrefix() override;
void writeSuffix() override;

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Columns/FilterDescription.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsCommon.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
@ -337,11 +338,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_granule);
bool will_read_until_mark = unread_rows_in_current_granule == limit - pre_filter_pos;
UInt8 nonzero = 0;
for (size_t row = pre_filter_pos; row < limit; ++row)
nonzero |= pre_filter[row];
if (!nonzero)
if (memoryIsZero(&pre_filter[pre_filter_pos], (limit - pre_filter_pos) * sizeof(pre_filter[0])))
{
/// Zero! Prewhere condition is false for all (limit - pre_filter_pos) rows.
readRows();
@ -395,16 +392,21 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
post_filter.resize(post_filter_pos);
/// At this point we may have arrays with non-zero offsets but with empty data,
/// as a result of reading components of Nested data structures with no data in filesystem.
/// We must fill these arrays to filter them correctly.
reader->fillMissingColumns(res, task->ordered_names, task->should_reorder);
/// Filter the columns related to PREWHERE using pre_filter,
/// other columns - using post_filter.
size_t rows = 0;
for (const auto i : ext::range(0, res.columns()))
{
auto & col = res.safeGetByPosition(i);
auto & col = res.getByPosition(i);
if (col.name == prewhere_column_name && res.columns() > 1)
continue;
col.column =
col.column->filter(task->column_name_set.count(col.name) ? post_filter : pre_filter, -1);
col.column = col.column->filter(task->column_name_set.count(col.name) ? post_filter : pre_filter, -1);
rows = col.column->size();
}
if (task->size_predictor)
@ -412,8 +414,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
/// Replace column with condition value from PREWHERE to a constant.
if (!task->remove_prewhere_column)
res.getByName(prewhere_column_name).column = DataTypeUInt8().createColumnConst(rows, UInt64(1));
res.getByName(prewhere_column_name).column = DataTypeUInt8().createColumnConst(rows, UInt64(1))->convertToFullColumnIfConst();
}
if (res)
@ -470,7 +471,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
}
void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block)
void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block) const
{
const auto rows = block.rows();
@ -482,17 +483,23 @@ void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block)
{
if (virt_column_name == "_part")
{
block.insert(ColumnWithTypeAndName{
DataTypeString().createColumnConst(rows, task->data_part->name)->convertToFullColumnIfConst(),
std::make_shared<DataTypeString>(),
virt_column_name});
ColumnPtr column;
if (rows)
column = DataTypeString().createColumnConst(rows, task->data_part->name)->convertToFullColumnIfConst();
else
column = DataTypeString().createColumn();
block.insert({ column, std::make_shared<DataTypeString>(), virt_column_name});
}
else if (virt_column_name == "_part_index")
{
block.insert(ColumnWithTypeAndName{
DataTypeUInt64().createColumnConst(rows, static_cast<UInt64>(task->part_index_in_query))->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(),
virt_column_name});
ColumnPtr column;
if (rows)
column = DataTypeUInt64().createColumnConst(rows, static_cast<UInt64>(task->part_index_in_query))->convertToFullColumnIfConst();
else
column = DataTypeUInt64().createColumn();
block.insert({ column, std::make_shared<DataTypeUInt64>(), virt_column_name});
}
}
}

View File

@ -42,7 +42,7 @@ protected:
Block readFromPart();
void injectVirtualColumns(Block & block);
void injectVirtualColumns(Block & block) const;
protected:
MergeTreeData & storage;

View File

@ -60,12 +60,31 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
<< " rows starting from " << all_mark_ranges.front().begin * storage.index_granularity);
setTotalRowsApprox(total_rows);
header = storage.getSampleBlockForColumns(ordered_names);
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
/// NOTE: We may use similar code to implement non blocking ALTERs.
for (const auto & name_type : data_part->columns)
{
if (header.has(name_type.name))
{
auto & elem = header.getByName(name_type.name);
if (!elem.type->equals(*name_type.type))
{
elem.type = name_type.type;
elem.column = elem.type->createColumn();
}
}
}
injectVirtualColumns(header);
}
Block MergeTreeBlockInputStream::getHeader() const
{
return storage.getSampleBlockForColumns(ordered_names);
return header;
}

View File

@ -48,6 +48,7 @@ protected:
bool getNewTask() override;
private:
Block header;
/// Used by Task
Names ordered_names;

Some files were not shown because too many files have changed in this diff Show More