This commit is contained in:
Evgeniy Gatov 2014-12-17 18:11:46 +03:00
commit 497ff4b9e9
73 changed files with 298 additions and 134 deletions

View File

@ -29,24 +29,28 @@ private:
public:
/// Принимает готовое соединение.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_)
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = Context{})
: connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
/// Принимает готовое соединение. Захватывает владение соединением из пула.
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), external_tables(external_tables_), stage(stage_)
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = Context{})
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_),
external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
/// Принимает пул, из которого нужно будет достать соединение.
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: pool(pool_), query(query_), external_tables(external_tables_), stage(stage_)
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = Context{})
: pool(pool_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
@ -103,7 +107,8 @@ protected:
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), settings, stage, DEFAULT_BLOCK_SIZE, 1);
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings,
stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)
res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first));
else
@ -246,6 +251,7 @@ private:
/// Временные таблицы, которые необходимо переслать на удаленные сервера.
Tables external_tables;
QueryProcessingStage::Enum stage;
Context context;
/// Отправили запрос (это делается перед получением первого блока).
bool sent_query = false;

View File

@ -1,7 +1,7 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Functions/IFunction.h>
#include <Yandex/singleton.h>
namespace DB
@ -14,7 +14,7 @@ class Context;
* Функция при создании также может использовать для инициализации (например, захватить SharedPtr)
* какие-нибудь справочники, находящиеся в Context-е.
*/
class FunctionFactory
class FunctionFactory : public Singleton<FunctionFactory>
{
private:
typedef IFunction* (*Creator)(const Context & context); /// Не std::function, так как меньше indirection и размер объекта.

View File

@ -14,7 +14,6 @@
#include <DB/Storages/MarkCache.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/Storages/IStorage.h>
#include <DB/Functions/FunctionFactory.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h>
@ -83,7 +82,6 @@ struct ContextShared
String path; /// Путь к директории с данными, со слешем на конце.
Databases databases; /// Список БД и таблиц в них.
TableFunctionFactory table_function_factory; /// Табличные функции.
FunctionFactory function_factory; /// Обычные функции.
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
DataTypeFactory data_type_factory; /// Типы данных.
StorageFactory storage_factory; /// Движки таблиц.
@ -254,7 +252,6 @@ public:
void setSetting(const String & name, const std::string & value);
const TableFunctionFactory & getTableFunctionFactory() const { return shared->table_function_factory; }
const FunctionFactory & getFunctionFactory() const { return shared->function_factory; }
const AggregateFunctionFactory & getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; }
const StorageFactory & getStorageFactory() const { return shared->storage_factory; }

View File

@ -162,6 +162,7 @@ public:
virtual BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -2,6 +2,7 @@
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <atomic>
namespace DB
{
@ -13,7 +14,7 @@ class MergeTreeDataMerger
public:
static const size_t NO_LIMIT = std::numeric_limits<size_t>::max();
MergeTreeDataMerger(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (Merger)")), canceled(false) {}
MergeTreeDataMerger(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (Merger)")) {}
typedef std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> AllowedMergingPredicate;
@ -49,9 +50,8 @@ public:
/** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancelAll().
*/
void cancelAll() { canceled = true; }
void uncancelAll() { canceled = false; }
bool cancelAll() { return canceled.exchange(true, std::memory_order_relaxed); }
bool uncancelAll() { return canceled.exchange(false, std::memory_order_relaxed); }
private:
MergeTreeData & data;
@ -61,7 +61,24 @@ private:
/// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто).
time_t disk_space_warning_time = 0;
volatile bool canceled;
std::atomic<bool> canceled{false};
};
class MergeTreeMergeBlocker
{
public:
MergeTreeMergeBlocker(MergeTreeDataMerger & merger)
: merger(merger), was_cancelled{merger.cancelAll()} {}
~MergeTreeMergeBlocker()
{
if (was_cancelled)
merger.uncancelAll();
}
private:
MergeTreeDataMerger & merger;
const bool was_cancelled;
};
}

View File

@ -20,6 +20,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -61,6 +61,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -40,6 +40,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -25,6 +25,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -37,6 +37,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
@ -46,6 +47,7 @@ public:
const std::string & chunk_name,
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -60,6 +60,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -162,6 +162,7 @@ public:
virtual BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
@ -227,6 +228,7 @@ protected:
size_t to_mark,
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -37,6 +37,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -83,6 +83,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -49,6 +49,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -70,6 +70,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -33,10 +33,11 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override
const size_t max_block_size = DEFAULT_BLOCK_SIZE,
const unsigned threads = 1) override
{
return { new NullBlockInputStream };
}

View File

@ -74,6 +74,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -26,6 +26,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -17,6 +17,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -28,6 +28,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -25,6 +25,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -129,6 +129,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -33,6 +33,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -90,7 +90,12 @@ private:
NamesAndTypesList res;
/// Отправляем на первый попавшийся шард
BlockInputStreamPtr input = new RemoteBlockInputStream(&*cluster.pools.front(), query, &settings, Tables(), QueryProcessingStage::Complete);
BlockInputStreamPtr input{
new RemoteBlockInputStream{
cluster.pools.front().get(), query, &settings,
Tables(), QueryProcessingStage::Complete, context
}
};
input->readPrefix();
while (true)

View File

@ -65,7 +65,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage;
Poco::SharedPtr<IBlockInputStream> in;
in = table->read(column_names, 0, Settings(), stage)[0];
in = table->read(column_names, 0, context, Settings(), stage)[0];
in = new ExpressionBlockInputStream(in, expression);
in = new LimitBlockInputStream(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));

View File

@ -70,7 +70,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage;
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Settings(), stage)[0];
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, context, Settings(), stage)[0];
in = new ExpressionBlockInputStream(in, expression);
in = new FilterBlockInputStream(in, 1);
in = new LimitBlockInputStream(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));

View File

@ -142,7 +142,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage;
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Settings(), stage)[0];
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, context, Settings(), stage)[0];
in = new ExpressionBlockInputStream(in, expression);
in = new FilterBlockInputStream(in, 4);
//in = new LimitBlockInputStream(in, 10);

View File

@ -85,7 +85,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage;
BlockInputStreamPtr in = table->read(column_names, 0, Settings(), stage)[0];
BlockInputStreamPtr in = table->read(column_names, 0, context, Settings(), stage)[0];
ForkBlockInputStreams fork(in);

View File

@ -21,6 +21,8 @@
#include <DB/Storages/StorageLog.h>
#include <DB/Interpreters/Context.h>
int main(int argc, char ** argv)
{
@ -102,7 +104,7 @@ int main(int argc, char ** argv)
if (argc == 2 && 0 == strcmp(argv[1], "read"))
{
QueryProcessingStage::Enum stage;
SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Settings(), stage)[0];
SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Context{}, Settings(), stage)[0];
WriteBufferFromOStream out1(std::cout);
CompressedWriteBuffer out2(out1);
NativeBlockOutputStream out3(out2);

View File

@ -159,7 +159,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage;
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Settings(), stage, argc == 2 ? atoi(argv[1]) : 1048576)[0];
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Context{}, Settings(), stage, argc == 2 ? atoi(argv[1]) : 1048576)[0];
in = new PartialSortingBlockInputStream(in, sort_columns);
in = new MergeSortingBlockInputStream(in, sort_columns);
//in = new LimitBlockInputStream(in, 10);

View File

@ -15,6 +15,8 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Interpreters/Context.h>
using Poco::SharedPtr;
@ -32,9 +34,9 @@ int main(int argc, char ** argv)
DB::QueryProcessingStage::Enum stage3;
DB::BlockInputStreams streams;
streams.push_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Settings(), stage1, 1)[0], 30, 30000));
streams.push_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Settings(), stage2, 1)[0], 30, 2000));
streams.push_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Settings(), stage3, 1)[0], 30, 100));
streams.emplace_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Context{}, DB::Settings(), stage1, 1)[0], 30, 30000));
streams.emplace_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Context{}, DB::Settings(), stage2, 1)[0], 30, 2000));
streams.emplace_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Context{}, DB::Settings(), stage3, 1)[0], 30, 100));
DB::UnionBlockInputStream union_stream(streams, 2);

View File

@ -39,7 +39,7 @@ int main(int argc, char ** argv)
DB::StoragePtr table = context.getTable("default", "hits6");
DB::QueryProcessingStage::Enum stage;
DB::BlockInputStreams streams = table->read(column_names, nullptr, settings, stage, settings.max_block_size, settings.max_threads);
DB::BlockInputStreams streams = table->read(column_names, nullptr, context, settings, stage, settings.max_block_size, settings.max_threads);
for (size_t i = 0, size = streams.size(); i < size; ++i)
streams[i] = new DB::AsynchronousBlockInputStream(streams[i]);

View File

@ -32,6 +32,8 @@
#include <DB/Parsers/formatAST.h>
#include <statdaemons/ext/range.hpp>
namespace DB
{
@ -589,6 +591,9 @@ static SharedPtr<InterpreterSelectQuery> interpretSubquery(
if (!parse_res)
throw Exception("Error in parsing SELECT query while creating set or join for table " + table->name + ".",
ErrorCodes::LOGICAL_ERROR);
/// @note it may be more appropriate to manually replace ASTAsterisk with table's columns
ExpressionAnalyzer{query, context, subquery_depth};
}
else
query = subquery->children.at(0);
@ -1055,7 +1060,7 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
}
}
FunctionPtr function = context.getFunctionFactory().get(node->name, context);
const FunctionPtr & function = FunctionFactory::instance().get(node->name, context);
Names argument_names;
DataTypes argument_types;
@ -1737,26 +1742,32 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
return;
auto & node = typeid_cast<ASTJoin &>(*select_query->join);
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
auto & table = node.table->children.at(0); /// TODO: поддержка идентификаторов.
size_t num_join_keys = keys.children.size();
for (size_t i = 0; i < num_join_keys; ++i)
Block nested_result_sample;
if (const auto identifier = typeid_cast<const ASTIdentifier *>(node.table.get()))
{
if (!join_key_names_left_set.insert(keys.children[i]->getColumnName()).second)
const auto & table = context.getTable("", identifier->name);
nested_result_sample = table->getSampleBlockNonMaterialized();
}
else if (typeid_cast<const ASTSubquery *>(node.table.get()))
{
const auto & table = node.table->children.at(0);
nested_result_sample = ExpressionAnalyzer(table, context, subquery_depth + 1).getSelectSampleBlock();
}
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
for (const auto & key : keys.children)
{
if (!join_key_names_left_set.insert(key->getColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
if (!join_key_names_right_set.insert(keys.children[i]->getAliasOrColumnName()).second)
if (!join_key_names_right_set.insert(key->getAliasOrColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
}
Block nested_result_sample = ExpressionAnalyzer(table, context, subquery_depth + 1).getSelectSampleBlock();
size_t nested_result_columns = nested_result_sample.columns();
for (size_t i = 0; i < nested_result_columns; ++i)
for (const auto i : ext::range(0, nested_result_sample.columns()))
{
auto col = nested_result_sample.getByPosition(i);
const auto & col = nested_result_sample.getByPosition(i);
if (!join_key_names_right_set.count(col.name))
{
joined_columns.insert(col.name);

View File

@ -547,7 +547,9 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
if (storage && storage->isRemote())
storage->storeExternalTables(query_analyzer->getExternalTables());
streams = storage->read(required_columns, query_ptr, settings_for_storage, from_stage, settings.max_block_size, settings.max_threads);
streams = storage->read(required_columns, query_ptr,
context, settings_for_storage, from_stage,
settings.max_block_size, settings.max_threads);
for (auto & stream : streams)
stream->addTableLock(table_lock);

View File

@ -464,7 +464,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
out_expression->addInput(ColumnWithNameAndType(nullptr, column.type, column.name));
FunctionPtr function = context.getFunctionFactory().get("to" + new_type_name, context);
const FunctionPtr & function = FunctionFactory::instance().get("to" + new_type_name, context);
Names out_names;
out_expression->add(ExpressionAction::applyFunction(function, Names(1, column.name)), out_names);
out_expression->add(ExpressionAction::removeColumn(column.name));

View File

@ -402,7 +402,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
Block block;
while (!canceled && (block = merged_stream->read()))
while (!canceled.load(std::memory_order_relaxed) && (block = merged_stream->read()))
{
rows_written += block.rows();
to.write(block);
@ -414,7 +414,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
}
if (canceled)
if (canceled.load(std::memory_order_relaxed))
throw Exception("Canceled merging parts", ErrorCodes::ABORTED);
merged_stream->readSuffix();

View File

@ -38,10 +38,11 @@ static Block getBlockWithVirtualColumns(const MergeTreeData::DataPartsVector & p
BlockInputStreams MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads,
const size_t max_block_size,
const unsigned threads,
size_t * part_index)
{
size_t part_index_var = 0;
@ -74,15 +75,15 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (!virt_column_names.empty())
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, data.context);
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
data.check(real_column_names);
processed_stage = QueryProcessingStage::FetchColumns;
PKCondition key_condition(query, data.context, data.getColumnsList(), data.getSortDescription());
PKCondition date_condition(query, data.context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
PKCondition key_condition(query, context, data.getColumnsList(), data.getSortDescription());
PKCondition date_condition(query, context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition, и которые подходят под условие на _part.
{

View File

@ -90,6 +90,7 @@ private:
BlockInputStreams StorageBuffer::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
@ -101,7 +102,7 @@ BlockInputStreams StorageBuffer::read(
if (!no_destination)
streams_from_dst = context.getTable(destination_database, destination_table)->read(
column_names, query, settings, processed_stage, max_block_size, threads);
column_names, query, context, settings, processed_stage, max_block_size, threads);
BlockInputStreams streams_from_buffers;
streams_from_buffers.reserve(num_shards);

View File

@ -62,10 +62,11 @@ bool StorageChunkMerger::hasColumn(const String & column_name) const
BlockInputStreams StorageChunkMerger::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
/// Будем читать из таблиц Chunks, на которые есть хоть одна ChunkRef, подходящая под регэксп, и из прочих таблиц, подходящих под регэксп.
Storages selected_tables;
@ -76,16 +77,17 @@ BlockInputStreams StorageChunkMerger::read(
typedef std::set<std::string> StringSet;
StringSet chunks_table_names;
Databases & databases = context.getDatabases();
const Databases & databases = context.getDatabases();
if (!databases.count(source_database))
const auto database_it = databases.find(source_database);
if (database_it == std::end(databases))
throw Exception("No database " + source_database, ErrorCodes::UNKNOWN_DATABASE);
Tables & tables = databases[source_database];
for (Tables::iterator it = tables.begin(); it != tables.end(); ++it)
const Tables & tables = database_it->second;
for (const auto & it : tables)
{
StoragePtr table = it->second;
if (table_name_regexp.match(it->first) &&
const StoragePtr & table = it.second;
if (table_name_regexp.match(it.first) &&
!typeid_cast<StorageChunks *>(&*table) &&
!typeid_cast<StorageChunkMerger *>(&*table))
{
@ -93,19 +95,20 @@ BlockInputStreams StorageChunkMerger::read(
{
if (chunk_ref->source_database_name != source_database)
{
LOG_WARNING(log, "ChunkRef " + it->first + " points to another database, ignoring");
LOG_WARNING(log, "ChunkRef " + it.first + " points to another database, ignoring");
continue;
}
if (!chunks_table_names.count(chunk_ref->source_table_name))
{
if (tables.count(chunk_ref->source_table_name))
const auto table_it = tables.find(chunk_ref->source_table_name);
if (table_it != std::end(tables))
{
chunks_table_names.insert(chunk_ref->source_table_name);
selected_tables.push_back(tables[chunk_ref->source_table_name]);
selected_tables.push_back(table_it->second);
}
else
{
LOG_WARNING(log, "ChunkRef " + it->first + " points to non-existing Chunks table, ignoring");
LOG_WARNING(log, "ChunkRef " + it.first + " points to non-existing Chunks table, ignoring");
}
}
}
@ -174,6 +177,7 @@ BlockInputStreams StorageChunkMerger::read(
BlockInputStreams source_streams = table->read(
real_column_names,
modified_query_ast,
context,
settings,
tmp_processed_stage,
max_block_size,
@ -465,6 +469,7 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
BlockInputStreams input_streams = src_storage->read(
src_column_names,
select_query_ptr,
context,
settings,
processed_stage,
DEFAULT_MERGE_BLOCK_SIZE);

View File

@ -12,14 +12,17 @@ StoragePtr StorageChunkRef::create(const std::string & name_, const Context & co
}
BlockInputStreams StorageChunkRef::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
return getSource().readFromChunk(name, column_names, query, settings, processed_stage, max_block_size, threads);
return getSource().readFromChunk(name, column_names, query,
context, settings, processed_stage,
max_block_size, threads);
}
ASTPtr StorageChunkRef::getCustomCreateQuery(const Context & context) const

View File

@ -46,10 +46,11 @@ void StorageChunks::removeReference()
BlockInputStreams StorageChunks::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
bool has_virtual_column = false;
@ -59,17 +60,21 @@ BlockInputStreams StorageChunks::read(
/// Если виртуальных столбцов нет, просто считать данные из таблицы
if (!has_virtual_column)
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
return read(0, std::numeric_limits<size_t>::max(), column_names,
query, context, settings,
processed_stage, max_block_size, threads);
Block virtual_columns_block = getBlockWithVirtualColumns();
if (!VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context))
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
return read(0, std::numeric_limits<size_t>::max(), column_names,
query, context, settings,
processed_stage, max_block_size, threads);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, _table_column_name);
BlockInputStreams res;
for (const auto & it : values)
{
BlockInputStreams temp = readFromChunk(it, column_names, query, settings, processed_stage, max_block_size, threads);
BlockInputStreams temp = readFromChunk(it, column_names, query, context, settings, processed_stage, max_block_size, threads);
res.insert(res.end(), temp.begin(), temp.end());
}
return res;
@ -92,10 +97,11 @@ BlockInputStreams StorageChunks::readFromChunk(
const std::string & chunk_name,
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
size_t mark1;
size_t mark2;
@ -110,7 +116,7 @@ BlockInputStreams StorageChunks::readFromChunk(
mark2 = index + 1 == chunk_num_to_marks.size() ? marksCount() : chunk_num_to_marks[index + 1];
}
return read(mark1, mark2, column_names, query, settings, processed_stage, max_block_size, threads);
return read(mark1, mark2, column_names, query, context, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageChunks::writeToNewChunk(

View File

@ -140,10 +140,11 @@ StoragePtr StorageDistributed::create(
BlockInputStreams StorageDistributed::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
@ -163,7 +164,8 @@ BlockInputStreams StorageDistributed::read(
for (auto & conn_pool : cluster.pools)
res.emplace_back(new RemoteBlockInputStream{
conn_pool, modified_query, &new_settings,
external_tables, processed_stage});
external_tables, processed_stage, context
});
/// Добавляем запросы к локальному ClickHouse.
if (cluster.getLocalNodesNum() > 0)

View File

@ -620,6 +620,7 @@ BlockInputStreams StorageLog::read(
size_t to_mark,
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
@ -693,12 +694,15 @@ BlockInputStreams StorageLog::read(
BlockInputStreams StorageLog::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
return read(0, std::numeric_limits<size_t>::max(), column_names,
query, context, settings, processed_stage,
max_block_size, threads);
}

View File

@ -99,12 +99,13 @@ bool StorageMaterializedView::hasColumn(const String & column_name) const
BlockInputStreams StorageMaterializedView::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
return data->read(column_names, query, settings, processed_stage, max_block_size, threads);
return data->read(column_names, query, context, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageMaterializedView::write(ASTPtr query)

View File

@ -99,6 +99,7 @@ StoragePtr StorageMemory::create(
BlockInputStreams StorageMemory::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,

View File

@ -79,10 +79,11 @@ bool StorageMerge::hasColumn(const String & column_name) const
BlockInputStreams StorageMerge::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
BlockInputStreams res;
@ -143,6 +144,7 @@ BlockInputStreams StorageMerge::read(
BlockInputStreams source_streams = table->read(
real_column_names,
modified_query_ast,
context,
settings,
tmp_processed_stage,
max_block_size,

View File

@ -90,12 +90,13 @@ StorageMergeTree::~StorageMergeTree()
BlockInputStreams StorageMergeTree::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
return reader.read(column_names, query, context, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query)
@ -127,6 +128,7 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & new_
void StorageMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{
/// NOTE: Здесь так же как в ReplicatedMergeTree можно сделать ALTER, не блокирующий запись данных надолго.
const MergeTreeMergeBlocker merge_blocker{merger};
auto table_soft_lock = lockDataForAlter();

View File

@ -1990,12 +1990,13 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
BlockInputStreams StorageReplicatedMergeTree::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
Names virt_column_names;
Names real_column_names;
@ -2024,8 +2025,9 @@ BlockInputStreams StorageReplicatedMergeTree::read(
if (unreplicated_reader && values.count(0))
{
res = unreplicated_reader->read(
real_column_names, query, settings, processed_stage, max_block_size, threads, &part_index);
res = unreplicated_reader->read(real_column_names, query,
context, settings, processed_stage,
max_block_size, threads, &part_index);
for (auto & virtual_column : virt_column_names)
{
@ -2039,7 +2041,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
if (values.count(1))
{
auto res2 = reader.read(real_column_names, query, settings, processed_stage, max_block_size, threads, &part_index);
auto res2 = reader.read(real_column_names, query, context, settings, processed_stage, max_block_size, threads, &part_index);
for (auto & virtual_column : virt_column_names)
{
@ -2099,6 +2101,8 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
const String & database_name, const String & table_name, Context & context)
{
auto zookeeper = getZooKeeper();
const MergeTreeMergeBlocker merger_blocker{merger};
const MergeTreeMergeBlocker unreplicated_merger_blocker{*unreplicated_merger};
LOG_DEBUG(log, "Doing ALTER");

View File

@ -21,8 +21,13 @@ StoragePtr StorageSystemDatabases::create(const std::string & name_, const Conte
BlockInputStreams StorageSystemDatabases::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -24,8 +24,13 @@ StoragePtr StorageSystemEvents::create(const std::string & name_)
BlockInputStreams StorageSystemEvents::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -33,8 +33,13 @@ StoragePtr StorageSystemMerges::create(const std::string & name, const Context &
}
BlockInputStreams StorageSystemMerges::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, const size_t max_block_size, const unsigned)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -66,8 +66,13 @@ StoragePtr StorageSystemNumbers::create(const std::string & name_, bool multithr
BlockInputStreams StorageSystemNumbers::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -24,8 +24,13 @@ StoragePtr StorageSystemOne::create(const std::string & name_)
BlockInputStreams StorageSystemOne::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -38,8 +38,13 @@ StoragePtr StorageSystemParts::create(const std::string & name_, const Context &
BlockInputStreams StorageSystemParts::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -32,8 +32,13 @@ StoragePtr StorageSystemProcesses::create(const std::string & name_, const Conte
BlockInputStreams StorageSystemProcesses::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -46,8 +46,13 @@ StoragePtr StorageSystemReplicas::create(const std::string & name_, const Contex
BlockInputStreams StorageSystemReplicas::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
@ -59,7 +64,7 @@ BlockInputStreams StorageSystemReplicas::read(
for (const auto & db : context.getDatabases())
for (const auto & table : db.second)
if (typeid_cast<const StorageReplicatedMergeTree *>(&*table.second))
if (typeid_cast<const StorageReplicatedMergeTree *>(table.second.get()))
replicated_tables[db.first][table.first] = table.second;
}

View File

@ -26,8 +26,13 @@ StoragePtr StorageSystemSettings::create(const std::string & name_, const Contex
BlockInputStreams StorageSystemSettings::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -24,8 +24,13 @@ StoragePtr StorageSystemTables::create(const std::string & name_, const Context
BlockInputStreams StorageSystemTables::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -103,8 +103,13 @@ static String extractPath(const ASTPtr & query)
BlockInputStreams StorageSystemZooKeeper::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -406,10 +406,11 @@ void StorageTinyLog::rename(const String & new_path_to_db, const String & new_da
BlockInputStreams StorageTinyLog::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -74,10 +74,11 @@ StorageView::StorageView(
BlockInputStreams StorageView::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
ASTPtr inner_query_clone = getInnerQuery();
ASTSelectQuery & inner_select = static_cast<ASTSelectQuery &>(*inner_query_clone);

View File

@ -20,6 +20,8 @@
#include <DB/Storages/StorageLog.h>
#include <DB/Interpreters/Context.h>
using Poco::SharedPtr;
using namespace DB;
@ -132,7 +134,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage;
BlockInputStreamPtr in = table->read(column_names, 0, Settings(), stage)[0];
BlockInputStreamPtr in = table->read(column_names, 0, Context{}, Settings(), stage)[0];
RowOutputStreamPtr out_ = new TabSeparatedRowOutputStream(out_buf, sample);
BlockOutputStreamFromRowOutputStream out(out_);
copyData(*in, out);

View File

@ -94,7 +94,7 @@ int main(int argc, char ** argv)
if (!parser.parse(begin, end, select, expected))
throw Poco::Exception("Cannot parse " + primary_expr_str);
SharedPtr<IBlockInputStream> in = table->read(column_names, select, Settings(), stage)[0];
SharedPtr<IBlockInputStream> in = table->read(column_names, select, context, Settings(), stage)[0];
Block sample;
{

View File

@ -10,6 +10,7 @@
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Interpreters/Context.h>
using Poco::SharedPtr;
@ -68,7 +69,7 @@ int main(int argc, char ** argv)
DB::QueryProcessingStage::Enum stage;
SharedPtr<DB::IBlockInputStream> in = table->read(column_names, 0, DB::Settings(), stage)[0];
SharedPtr<DB::IBlockInputStream> in = table->read(column_names, 0, DB::Context{}, DB::Settings(), stage)[0];
DB::Block sample;
{

View File

@ -9,6 +9,7 @@
#include <DB/DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Interpreters/Context.h>
using Poco::SharedPtr;
@ -31,7 +32,7 @@ int main(int argc, char ** argv)
DB::QueryProcessingStage::Enum stage;
DB::LimitBlockInputStream input(table->read(column_names, 0, DB::Settings(), stage, 10)[0], 10, 96);
DB::LimitBlockInputStream input(table->read(column_names, 0, DB::Context{}, DB::Settings(), stage, 10)[0], 10, 96);
DB::RowOutputStreamPtr output_ = new DB::TabSeparatedRowOutputStream(out_buf, sample);
DB::BlockOutputStreamFromRowOutputStream output(output_);