Merge branch 'master' into database_atomic

This commit is contained in:
Alexander Tokmakov 2019-11-11 20:32:39 +03:00
commit a6fe3578eb
116 changed files with 1779 additions and 417 deletions

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54428)
set(VERSION_REVISION 54429)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 17)
set(VERSION_MINOR 18)
set(VERSION_PATCH 1)
set(VERSION_GITHASH 5286d0afb285a5fbf3d320af3daa6de6b1841374)
set(VERSION_DESCRIBE v19.17.1.1-prestable)
set(VERSION_STRING 19.17.1.1)
set(VERSION_GITHASH 4e68211879480b637683ae66dbcc89a2714682af)
set(VERSION_DESCRIBE v19.18.1.1-prestable)
set(VERSION_STRING 19.18.1.1)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -18,6 +18,7 @@
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/NumberParser.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
@ -95,6 +96,7 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
std::string schema_name = "";
std::string table_name = params.get("table");
std::string connection_string = params.get("connection_string");
if (params.has("schema"))
{
schema_name = params.get("schema");
@ -106,6 +108,8 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
try
{
const bool external_table_functions_use_nulls = Poco::NumberParser::parseBool(params.get("external_table_functions_use_nulls", "false"));
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
SQLHDBC hdbc = session.dbc().handle();
@ -160,13 +164,13 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
/// TODO Why 301?
SQLCHAR column_name[301];
SQLSMALLINT nullable;
const auto result = POCO_SQL_ODBC_CLASS::SQLDescribeCol(hstmt, ncol, column_name, sizeof(column_name), nullptr, &type, nullptr, nullptr, &nullable);
SQLSMALLINT is_nullable;
const auto result = POCO_SQL_ODBC_CLASS::SQLDescribeCol(hstmt, ncol, column_name, sizeof(column_name), nullptr, &type, nullptr, nullptr, &is_nullable);
if (POCO_SQL_ODBC_CLASS::Utility::isError(result))
throw POCO_SQL_ODBC_CLASS::StatementException(hstmt);
auto column_type = getDataType(type);
if (nullable == SQL_NULLABLE)
if (external_table_functions_use_nulls && is_nullable == SQL_NULLABLE)
{
column_type = std::make_shared<DataTypeNullable>(column_type);
}

View File

@ -523,6 +523,9 @@ void TCPHandler::processOrdinaryQuery()
*/
if (!block && !isQueryCancelled())
{
/// Wait till inner thread finish to avoid possible race with getTotals.
async_in.waitInnerThread();
sendTotals(state.io.in->getTotals());
sendExtremes(state.io.in->getExtremes());
sendProfileInfo(state.io.in->getProfileInfo());

View File

@ -129,6 +129,8 @@ public:
return nested_func->allocatesMemoryInArena();
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -119,23 +119,56 @@ public:
*/
virtual bool isState() const { return false; }
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining. When offsets is not
* null, behave like AddBatchArrayFunc (it's used to work around unknown regressions).
*/
using AddBatchFunc = void (*)(
const IAggregateFunction *,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena);
/** The same for single place.
*/
using AddBatchSinglePlaceFunc
= void (*)(const IAggregateFunction *, size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena);
/** In addition to the above method, this variant accepts an array of "offsets" which allows
* collecting multiple rows of arguments into array "places" as long as they are between
* offsets[i-1] and offsets[i]. It is used for arrayReduce and might be used generally to
* break data dependency when array "places" contains a large number of same values
* consecutively.
*/
using AddBatchArrayFunc = void (*)(
const IAggregateFunction *,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena);
struct AddFuncs
{
AddFunc add;
AddBatchFunc add_batch;
AddBatchSinglePlaceFunc add_batch_single_place;
AddBatchArrayFunc add_batch_array;
};
/** The inner loop that uses the function pointer is better than using the virtual function.
* The reason is that in the case of virtual functions GCC 5.1.2 generates code,
* which, at each iteration of the loop, reloads the function address (the offset value in the virtual function table) from memory to the register.
* This gives a performance drop on simple queries around 12%.
* After the appearance of better compilers, the code can be removed.
*/
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
virtual AddFunc getAddressOfAddFunction() const = 0;
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;
/** The same for single place.
*/
virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
virtual AddFuncs getAddressOfAddFunctions() const = 0;
/** This is used for runtime code generation to determine, which header files to include in generated source.
* Always implement it as
@ -162,23 +195,58 @@ private:
static_cast<const Derived &>(*that).add(place, columns, row_num, arena);
}
static void addBatch(
const IAggregateFunction * that,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena)
{
if (offsets)
{
size_t current_offset = 0;
for (size_t i = 0; i < batch_size; ++i)
{
size_t next_offset = offsets[i];
for (size_t j = current_offset; j < next_offset; ++j)
static_cast<const Derived *>(that)->add(places[i] + place_offset, columns, j, arena);
current_offset = next_offset;
}
}
else
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(that)->add(places[i] + place_offset, columns, i, arena);
}
static void
addBatchSinglePlaceFree(const IAggregateFunction * that, size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena)
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(that)->add(place, columns, i, arena);
}
/// TODO: We cannot use this function directly as it slows down aggregate functions like uniqCombined due to unknown reasons.
static void addBatchArrayFree(const IAggregateFunction * that,
size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
{
size_t current_offset = 0;
for (size_t i = 0; i < batch_size; ++i)
{
size_t next_offset = offsets[i];
for (size_t j = current_offset; j < next_offset; ++j)
static_cast<const Derived *>(that)->add(places[i] + place_offset, columns, j, arena);
current_offset = next_offset;
}
}
public:
IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_)
: IAggregateFunction(argument_types_, parameters_) {}
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
/// If we return addBatchArrayFree instead of nullptr, it leads to regression.
AddFuncs getAddressOfAddFunctions() const override { return {&addFree, &addBatch, &addBatchSinglePlaceFree, nullptr}; }
};

View File

@ -357,7 +357,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, allow_experimental_multiple_joins_emulation, true, "Emulate multiple joins using subselects") \
M(SettingBool, allow_experimental_cross_to_join_conversion, true, "Convert CROSS JOIN to INNER JOIN if possible") \
M(SettingBool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.") \
M(SettingBool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only for 'mysql' table function.") \
M(SettingBool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only by 'mysql' and 'odbc' table functions.") \
M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.") \
\
M(SettingBool, experimental_use_processors, false, "Use processors pipeline.") \

View File

@ -83,7 +83,7 @@ private:
SimpleAggregateDescription(const AggregateFunctionPtr & function_, const size_t column_number_) : function(function_), column_number(column_number_)
{
add_function = function->getAddressOfAddFunction();
add_function = function->getAddressOfAddFunctions().add;
state.reset(function->sizeOfData(), function->alignOfData());
}

View File

@ -33,6 +33,12 @@ public:
String getName() const override { return "Asynchronous"; }
void waitInnerThread()
{
if (started)
pool.wait();
}
void readPrefix() override
{
/// Do not call `readPrefix` on the child, so that the corresponding actions are performed in a separate thread.

View File

@ -146,6 +146,9 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
}
}
if (subquery.set)
subquery.set->finishInsert();
if (table_out)
table_out->writeSuffix();

View File

@ -188,7 +188,7 @@ Block NativeBlockInputStream::readImpl()
for (auto & col : header)
{
if (res.has(col.name))
tmp_res.insert(std::move(res.getByName(col.name)));
tmp_res.insert(res.getByName(col.name));
else
tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name});
}

View File

@ -12,7 +12,7 @@ namespace DB
class OneBlockInputStream : public IBlockInputStream
{
public:
explicit OneBlockInputStream(const Block & block_) : block(block_) {}
explicit OneBlockInputStream(Block block_) : block(std::move(block_)) { block.checkNumberOfRows(); }
String getName() const override { return "One"; }

View File

@ -203,6 +203,19 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
{
BlockInputStreamPtr in;
/// We need keep InterpreterSelectQuery, until the processing will be finished, since:
///
/// - We copy Context inside InterpreterSelectQuery to support
/// modification of context (Settings) for subqueries
/// - InterpreterSelectQuery lives shorter than query pipeline.
/// It's used just to build the query pipeline and no longer needed
/// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery,
/// **can** take a reference to Context from InterpreterSelectQuery
/// (the problem raises only when function uses context from the
/// execute*() method, like FunctionDictGet do)
/// - These objects live inside query pipeline (DataStreams) and the reference become dangling.
std::optional<InterpreterSelectQuery> select;
if (view.query)
{
/// We create a table with the same name as original table and the same alias columns,
@ -212,8 +225,8 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
local_context.addViewSource(
StorageValues::create(storage->getDatabaseName(), storage->getTableName(), storage->getColumns(),
block));
InterpreterSelectQuery select(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().in);
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY

View File

@ -2,6 +2,7 @@
#include <DataStreams/OneBlockInputStream.h>
#include <Common/NetException.h>
#include <Common/CurrentThread.h>
#include <Columns/ColumnConst.h>
#include <Interpreters/Context.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/InternalTextLogsQueue.h>
@ -173,9 +174,30 @@ static Block adaptBlockStructure(const Block & block, const Block & header, cons
ColumnPtr column;
if (elem.column && isColumnConst(*elem.column))
/// TODO: check that column from block contains the same value.
{
/// We expect constant column in block.
/// If block is not empty, then get value for constant from it,
/// because it may be different for remote server for functions like version(), uptime(), ...
if (block.rows() > 0 && block.has(elem.name))
{
/// Const column is passed as materialized. Get first value from it.
///
/// TODO: check that column contains the same value.
/// TODO: serialize const columns.
auto col = block.getByName(elem.name);
col.column = block.getByName(elem.name).column->cut(0, 1);
column = castColumn(col, elem.type, context);
if (!isColumnConst(*column))
column = ColumnConst::create(column, block.rows());
else
/// It is not possible now. Just in case we support const columns serialization.
column = column->cloneResized(block.rows());
}
else
column = elem.column->cloneResized(block.rows());
}
else
column = castColumn(block.getByName(elem.name), elem.type, context);

View File

@ -84,7 +84,7 @@ private:
void init(const char * function_name, const DataTypes & argument_types)
{
function = AggregateFunctionFactory::instance().get(function_name, argument_types);
add_function = function->getAddressOfAddFunction();
add_function = function->getAddressOfAddFunctions().add;
state.reset(function->sizeOfData(), function->alignOfData());
}

View File

@ -110,8 +110,7 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
if (!create)
{
std::ostringstream query_stream;
//FIXME WTF
formatAST(*create, query_stream, true);
formatAST(*query, query_stream, true);
throw Exception("Query '" + query_stream.str() + "' is not CREATE query", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -18,6 +18,7 @@ namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
}
const ColumnConst * checkAndGetColumnConstStringOrFixedString(const IColumn * column)
@ -118,4 +119,28 @@ void validateArgumentType(const IFunction & func, const DataTypes & arguments,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
std::pair<std::vector<const IColumn *>, const ColumnArray::Offset *>
checkAndGetNestedArrayOffset(const IColumn ** columns, size_t num_arguments)
{
assert(num_arguments > 0);
std::vector<const IColumn *> nested_columns(num_arguments);
const ColumnArray::Offsets * offsets = nullptr;
for (size_t i = 0; i < num_arguments; ++i)
{
const ColumnArray::Offsets * offsets_i = nullptr;
if (const ColumnArray * arr = checkAndGetColumn<const ColumnArray>(columns[i]))
{
nested_columns[i] = &arr->getData();
offsets_i = &arr->getOffsets();
}
else
throw Exception("Illegal column " + columns[i]->getName() + " as argument of function", ErrorCodes::ILLEGAL_COLUMN);
if (i == 0)
offsets = offsets_i;
else if (*offsets_i != *offsets)
throw Exception("Lengths of all arrays passed to aggregate function must be equal.", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
return {nested_columns, offsets->data()};
}
}

View File

@ -4,6 +4,7 @@
#include <Common/assert_cast.h>
#include <DataTypes/IDataType.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Core/Block.h>
#include <Core/ColumnNumbers.h>
@ -89,4 +90,8 @@ void validateArgumentType(const IFunction & func, const DataTypes & arguments,
size_t argument_index, bool (* validator_func)(const IDataType &),
const char * expected_type_description);
/// Checks if a list of array columns have equal offsets. Return a pair of nested columns and offsets if true, otherwise throw.
std::pair<std::vector<const IColumn *>, const ColumnArray::Offset *>
checkAndGetNestedArrayOffset(const IColumn ** columns, size_t num_arguments);
}

View File

@ -19,6 +19,7 @@
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnDecimal.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
@ -949,7 +950,8 @@ public:
if (!which.isStringOrFixedString() &&
!which.isDateOrDateTime() &&
!which.isUInt() &&
!which.isFloat())
!which.isFloat() &&
!which.isDecimal())
throw Exception("Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
@ -1023,36 +1025,39 @@ public:
}
template <typename T>
bool tryExecuteFloat(const IColumn * col, ColumnPtr & col_res)
{
const ColumnVector<T> * col_vec = checkAndGetColumn<ColumnVector<T>>(col);
static constexpr size_t FLOAT_HEX_LENGTH = sizeof(T) * 2 + 1; /// Including trailing zero byte.
if (col_vec)
void executeFloatAndDecimal(const T & in_vec, ColumnPtr & col_res, const size_t type_size_in_bytes)
{
const size_t hex_length = type_size_in_bytes * 2 + 1; /// Including trailing zero byte.
auto col_str = ColumnString::create();
ColumnString::Chars & out_vec = col_str->getChars();
ColumnString::Offsets & out_offsets = col_str->getOffsets();
const typename ColumnVector<T>::Container & in_vec = col_vec->getData();
size_t size = in_vec.size();
out_offsets.resize(size);
out_vec.resize(size * FLOAT_HEX_LENGTH);
out_vec.resize(size * hex_length);
size_t pos = 0;
char * out = reinterpret_cast<char *>(&out_vec[0]);
for (size_t i = 0; i < size; ++i)
{
const UInt8 * in_pos = reinterpret_cast<const UInt8 *>(&in_vec[i]);
executeOneString(in_pos, in_pos + sizeof(T), out);
executeOneString(in_pos, in_pos + type_size_in_bytes, out);
pos += FLOAT_HEX_LENGTH;
pos += hex_length;
out_offsets[i] = pos;
}
col_res = std::move(col_str);
}
template <typename T>
bool tryExecuteFloat(const IColumn * col, ColumnPtr & col_res)
{
const ColumnVector<T> * col_vec = checkAndGetColumn<ColumnVector<T>>(col);
if (col_vec)
{
const typename ColumnVector<T>::Container & in_vec = col_vec->getData();
executeFloatAndDecimal<typename ColumnVector<T>::Container>(in_vec, col_res, sizeof(T));
return true;
}
else
@ -1061,6 +1066,23 @@ public:
}
}
template <typename T>
bool tryExecuteDecimal(const IColumn * col, ColumnPtr & col_res)
{
const ColumnDecimal<T> * col_dec = checkAndGetColumn<ColumnDecimal<T>>(col);
if (col_dec)
{
const typename ColumnDecimal<T>::Container & in_vec = col_dec->getData();
executeFloatAndDecimal<typename ColumnDecimal<T>::Container>(in_vec, col_res, sizeof(T));
return true;
}
else
{
return false;
}
}
void executeOneString(const UInt8 * pos, const UInt8 * end, char *& out)
{
while (pos < end)
@ -1177,7 +1199,10 @@ public:
tryExecuteString(column, res_column) ||
tryExecuteFixedString(column, res_column) ||
tryExecuteFloat<Float32>(column, res_column) ||
tryExecuteFloat<Float64>(column, res_column))
tryExecuteFloat<Float64>(column, res_column) ||
tryExecuteDecimal<Decimal32>(column, res_column) ||
tryExecuteDecimal<Decimal64>(column, res_column) ||
tryExecuteDecimal<Decimal128>(column, res_column))
return;
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()

View File

@ -23,6 +23,7 @@ void registerFunctionsHashing(FunctionFactory & factory)
factory.registerFunction<FunctionIntHash64>();
factory.registerFunction<FunctionURLHash>();
factory.registerFunction<FunctionJavaHash>();
factory.registerFunction<FunctionJavaHashUTF16LE>();
factory.registerFunction<FunctionHiveHash>();
factory.registerFunction<FunctionMurmurHash2_32>();
factory.registerFunction<FunctionMurmurHash2_64>();

View File

@ -353,6 +353,41 @@ struct JavaHashImpl
static constexpr bool use_int_hash_for_pods = false;
};
struct JavaHashUTF16LEImpl
{
static constexpr auto name = "javaHashUTF16LE";
using ReturnType = Int32;
static Int32 apply(const char * raw_data, const size_t raw_size)
{
char * data = const_cast<char *>(raw_data);
size_t size = raw_size;
// Remove Byte-order-mark(0xFFFE) for UTF-16LE
if (size >= 2 && data[0] == '\xFF' && data[1] == '\xFE')
{
data += 2;
size -= 2;
}
if (size % 2 != 0)
throw Exception("Arguments for javaHashUTF16LE must be in the form of UTF-16", ErrorCodes::LOGICAL_ERROR);
UInt32 h = 0;
for (size_t i = 0; i < size; i += 2)
h = 31 * h + static_cast<UInt16>(static_cast<UInt8>(data[i]) | static_cast<UInt8>(data[i + 1]) << 8);
return static_cast<Int32>(h);
}
static Int32 combineHashes(Int32, Int32)
{
throw Exception("Java hash is not combineable for multiple arguments", ErrorCodes::NOT_IMPLEMENTED);
}
static constexpr bool use_int_hash_for_pods = false;
};
/// This is just JavaHash with zeroed out sign bit.
/// This function is used in Hive for versions before 3.0,
/// after 3.0, Hive uses murmur-hash3.
@ -1102,6 +1137,7 @@ using FunctionMurmurHash3_32 = FunctionAnyHash<MurmurHash3Impl32>;
using FunctionMurmurHash3_64 = FunctionAnyHash<MurmurHash3Impl64>;
using FunctionMurmurHash3_128 = FunctionStringHashFixedString<MurmurHash3Impl128>;
using FunctionJavaHash = FunctionAnyHash<JavaHashImpl>;
using FunctionJavaHashUTF16LE = FunctionAnyHash<JavaHashUTF16LEImpl>;
using FunctionHiveHash = FunctionAnyHash<HiveHashImpl>;
#if USE_XXHASH

View File

@ -65,7 +65,7 @@ public:
{
/// Choose JSONParser.
#if USE_SIMDJSON
if (context.getSettings().allow_simdjson && Cpu::CpuFlagsCache::have_SSE42 && Cpu::CpuFlagsCache::have_PCLMUL)
if (context.getSettingsRef().allow_simdjson && Cpu::CpuFlagsCache::have_SSE42 && Cpu::CpuFlagsCache::have_PCLMUL)
{
Executor<SimdJSONParser>::run(block, arguments, result_pos, input_rows_count);
return;

View File

@ -7,11 +7,14 @@
#include <Columns/ColumnAggregateFunction.h>
#include <IO/WriteHelpers.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <Common/AlignedBuffer.h>
#include <Common/Arena.h>
#include <ext/scope_guard.h>
namespace DB
{
@ -106,10 +109,7 @@ DataTypePtr FunctionArrayReduce::getReturnTypeImpl(const ColumnsWithTypeAndName
void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
{
IAggregateFunction & agg_func = *aggregate_function.get();
AlignedBuffer place_holder(agg_func.sizeOfData(), agg_func.alignOfData());
AggregateDataPtr place = place_holder.data();
std::unique_ptr<Arena> arena = agg_func.allocatesMemoryInArena() ? std::make_unique<Arena>() : nullptr;
std::unique_ptr<Arena> arena = std::make_unique<Arena>();
/// Aggregate functions do not support constant columns. Therefore, we materialize them.
std::vector<ColumnPtr> materialized_columns;
@ -157,32 +157,41 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum
throw Exception("State function " + agg_func.getName() + " inserts results into non-state column "
+ block.getByPosition(result).type->getName(), ErrorCodes::ILLEGAL_COLUMN);
ColumnArray::Offset current_offset = 0;
PODArray<AggregateDataPtr> places(input_rows_count);
for (size_t i = 0; i < input_rows_count; ++i)
{
agg_func.create(place);
ColumnArray::Offset next_offset = (*offsets)[i];
places[i] = arena->alignedAlloc(agg_func.sizeOfData(), agg_func.alignOfData());
try
{
for (size_t j = current_offset; j < next_offset; ++j)
agg_func.add(place, aggregate_arguments, j, arena.get());
if (!res_col_aggregate_function)
agg_func.insertResultInto(place, res_col);
else
res_col_aggregate_function->insertFrom(place);
agg_func.create(places[i]);
}
catch (...)
{
agg_func.destroy(place);
agg_func.destroy(places[i]);
throw;
}
agg_func.destroy(place);
current_offset = next_offset;
}
SCOPE_EXIT({
for (size_t i = 0; i < input_rows_count; ++i)
agg_func.destroy(places[i]);
});
{
auto that = &agg_func;
/// Unnest consecutive trailing -State combinators
while (auto func = typeid_cast<AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
that->getAddressOfAddFunctions().add_batch(
that, input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get());
}
for (size_t i = 0; i < input_rows_count; ++i)
if (!res_col_aggregate_function)
agg_func.insertResultInto(places[i], res_col);
else
res_col_aggregate_function->insertFrom(places[i]);
block.getByPosition(result).column = std::move(result_holder);
}

View File

@ -1,9 +1,12 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/getLeastSupertype.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnVector.h>
#include <Interpreters/castColumn.h>
#include <numeric>
@ -15,6 +18,7 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
@ -22,30 +26,41 @@ class FunctionRange : public IFunction
{
public:
static constexpr auto name = "range";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionRange>(); }
static constexpr size_t max_elements = 100'000'000;
static FunctionPtr create(const Context & context_) { return std::make_shared<FunctionRange>(context_); }
FunctionRange(const Context & context_) : context(context_) {}
private:
const Context & context;
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const DataTypePtr & arg = arguments.front();
if (arguments.size() > 3 || arguments.empty())
{
throw Exception{"Function " + getName() + " needs 1..3 arguments; passed "
+ std::to_string(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH};
}
for (const auto & arg : arguments)
{
if (!isUnsignedInteger(arg))
throw Exception{"Illegal type " + arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
return std::make_shared<DataTypeArray>(arg);
DataTypePtr common_type = getLeastSupertype(arguments);
return std::make_shared<DataTypeArray>(common_type);
}
template <typename T>
bool executeInternal(Block & block, const IColumn * arg, const size_t result)
{
static constexpr size_t max_elements = 100'000'000;
if (const auto in = checkAndGetColumn<ColumnVector<T>>(arg))
{
const auto & in_data = in->getData();
@ -88,10 +103,232 @@ private:
return false;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t) override
template <typename T>
bool executeConstStartStep(Block & block, const IColumn * end_arg, const T start, const T step, const size_t input_rows_count, const size_t result)
{
auto end_column = checkAndGetColumn<ColumnVector<T>>(end_arg);
if (!end_column)
{
return false;
}
const auto & end_data = end_column->getData();
size_t total_values = 0;
size_t pre_values = 0;
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx)
{
if (start < end_data[row_idx] && step == 0)
throw Exception{"A call to function " + getName() + " overflows, the 3rd argument step can't be zero",
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
pre_values += start >= end_data[row_idx] ? 0
: (end_data[row_idx] - start - 1) / step + 1;
if (pre_values < total_values)
throw Exception{"A call to function " + getName() + " overflows, investigate the values of arguments you are passing",
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
total_values = pre_values;
if (total_values > max_elements)
throw Exception{"A call to function " + getName() + " would produce " + std::to_string(total_values) +
" array elements, which is greater than the allowed maximum of " + std::to_string(max_elements),
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
}
auto data_col = ColumnVector<T>::create(total_values);
auto offsets_col = ColumnArray::ColumnOffsets::create(end_column->size());
auto & out_data = data_col->getData();
auto & out_offsets = offsets_col->getData();
IColumn::Offset offset{};
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx)
{
for (size_t st = start, ed = end_data[row_idx]; st < ed; st += step)
out_data[offset++] = st;
out_offsets[row_idx] = offset;
}
block.getByPosition(result).column = ColumnArray::create(std::move(data_col), std::move(offsets_col));
return true;
}
template <typename T>
bool executeConstStep(Block & block, const IColumn * start_arg, const IColumn * end_arg, const T step, const size_t input_rows_count, const size_t result)
{
auto start_column = checkAndGetColumn<ColumnVector<T>>(start_arg);
auto end_column = checkAndGetColumn<ColumnVector<T>>(end_arg);
if (!end_column || !start_column)
{
return false;
}
const auto & start_data = start_column->getData();
const auto & end_data = end_column->getData();
size_t total_values = 0;
size_t pre_values = 0;
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx)
{
if (start_data[row_idx] < end_data[row_idx] && step == 0)
throw Exception{"A call to function " + getName() + " overflows, the 3rd argument step can't be zero",
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
pre_values += start_data[row_idx] >= end_data[row_idx] ? 0
: (end_data[row_idx] - start_data[row_idx] - 1) / step + 1;
if (pre_values < total_values)
throw Exception{"A call to function " + getName() + " overflows, investigate the values of arguments you are passing",
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
total_values = pre_values;
if (total_values > max_elements)
throw Exception{"A call to function " + getName() + " would produce " + std::to_string(total_values) +
" array elements, which is greater than the allowed maximum of " + std::to_string(max_elements),
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
}
auto data_col = ColumnVector<T>::create(total_values);
auto offsets_col = ColumnArray::ColumnOffsets::create(end_column->size());
auto & out_data = data_col->getData();
auto & out_offsets = offsets_col->getData();
IColumn::Offset offset{};
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx)
{
for (size_t st = start_data[row_idx], ed = end_data[row_idx]; st < ed; st += step)
out_data[offset++] = st;
out_offsets[row_idx] = offset;
}
block.getByPosition(result).column = ColumnArray::create(std::move(data_col), std::move(offsets_col));
return true;
}
template <typename T>
bool executeConstStart(Block & block, const IColumn * end_arg, const IColumn * step_arg, const T start, const size_t input_rows_count, const size_t result)
{
auto end_column = checkAndGetColumn<ColumnVector<T>>(end_arg);
auto step_column = checkAndGetColumn<ColumnVector<T>>(step_arg);
if (!end_column || !step_column)
{
return false;
}
const auto & end_data = end_column->getData();
const auto & step_data = step_column->getData();
size_t total_values = 0;
size_t pre_values = 0;
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx)
{
if (start < end_data[row_idx] && step_data[row_idx] == 0)
throw Exception{"A call to function " + getName() + " overflows, the 3rd argument step can't be zero",
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
pre_values += start >= end_data[row_idx] ? 0
: (end_data[row_idx] - start - 1) / step_data[row_idx] + 1;
if (pre_values < total_values)
throw Exception{"A call to function " + getName() + " overflows, investigate the values of arguments you are passing",
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
total_values = pre_values;
if (total_values > max_elements)
throw Exception{"A call to function " + getName() + " would produce " + std::to_string(total_values) +
" array elements, which is greater than the allowed maximum of " + std::to_string(max_elements),
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
}
auto data_col = ColumnVector<T>::create(total_values);
auto offsets_col = ColumnArray::ColumnOffsets::create(end_column->size());
auto & out_data = data_col->getData();
auto & out_offsets = offsets_col->getData();
IColumn::Offset offset{};
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx)
{
for (size_t st = start, ed = end_data[row_idx]; st < ed; st += step_data[row_idx])
out_data[offset++] = st;
out_offsets[row_idx] = offset;
}
block.getByPosition(result).column = ColumnArray::create(std::move(data_col), std::move(offsets_col));
return true;
}
template <typename T>
bool executeGeneric(Block & block, const IColumn * start_col, const IColumn * end_col, const IColumn * step_col, const size_t input_rows_count, const size_t result)
{
auto start_column = checkAndGetColumn<ColumnVector<T>>(start_col);
auto end_column = checkAndGetColumn<ColumnVector<T>>(end_col);
auto step_column = checkAndGetColumn<ColumnVector<T>>(step_col);
if (!start_column || !end_column || !step_column)
{
return false;
}
const auto & start_data = start_column->getData();
const auto & end_start = end_column->getData();
const auto & step_data = step_column->getData();
size_t total_values = 0;
size_t pre_values = 0;
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx)
{
if (start_data[row_idx] < end_start[row_idx] && step_data[row_idx] == 0)
throw Exception{"A call to function " + getName() + " overflows, the 3rd argument step can't be zero",
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
pre_values += start_data[row_idx] >= end_start[row_idx] ? 0
: (end_start[row_idx] -start_data[row_idx] - 1) / (step_data[row_idx]) + 1;
if (pre_values < total_values)
throw Exception{"A call to function " + getName() + " overflows, investigate the values of arguments you are passing",
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
total_values = pre_values;
if (total_values > max_elements)
throw Exception{"A call to function " + getName() + " would produce " + std::to_string(total_values) +
" array elements, which is greater than the allowed maximum of " + std::to_string(max_elements),
ErrorCodes::ARGUMENT_OUT_OF_BOUND};
}
auto data_col = ColumnVector<T>::create(total_values);
auto offsets_col = ColumnArray::ColumnOffsets::create(end_column->size());
auto & out_data = data_col->getData();
auto & out_offsets = offsets_col->getData();
IColumn::Offset offset{};
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx)
{
for (size_t st = start_data[row_idx], ed = end_start[row_idx]; st < ed; st += step_data[row_idx])
out_data[offset++] = st;
out_offsets[row_idx] = offset;
}
block.getByPosition(result).column = ColumnArray::create(std::move(data_col), std::move(offsets_col));
return true;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
if (arguments.size() == 1)
{
const auto col = block.getByPosition(arguments[0]).column.get();
if (!executeInternal<UInt8>(block, col, result) &&
!executeInternal<UInt16>(block, col, result) &&
!executeInternal<UInt32>(block, col, result) &&
@ -99,7 +336,76 @@ private:
{
throw Exception{"Illegal column " + col->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN};
}
return;
}
Columns columns_holder(3);
ColumnRawPtrs columns(3);
const auto return_type = checkAndGetDataType<DataTypeArray>(block.getByPosition(result).type.get())->getNestedType();
for (size_t i = 0; i < arguments.size(); ++i)
{
if (i == 1)
columns_holder[i] = castColumn(block.getByPosition(arguments[i]), return_type, context)->convertToFullColumnIfConst();
else
columns_holder[i] = castColumn(block.getByPosition(arguments[i]), return_type, context);
columns[i] = columns_holder[i].get();
}
// for step column, defaults to 1
if (arguments.size() == 2)
{
columns_holder[2] = return_type->createColumnConst(input_rows_count, 1);
columns[2] = columns_holder[2].get();
}
bool is_start_const = isColumnConst(*columns[0]);
bool is_step_const = isColumnConst(*columns[2]);
bool ok;
if (is_start_const && is_step_const)
{
UInt64 start = assert_cast<const ColumnConst &>(*columns[0]).getUInt(0);
UInt64 step = assert_cast<const ColumnConst &>(*columns[2]).getUInt(0);
ok = executeConstStartStep<UInt8>(block, columns[1], start, step, input_rows_count, result) ||
executeConstStartStep<UInt16>(block, columns[1], start, step, input_rows_count, result) ||
executeConstStartStep<UInt32>(block, columns[1], start, step, input_rows_count, result) ||
executeConstStartStep<UInt64>(block, columns[1], start, step, input_rows_count, result);
}
else if (is_start_const && !is_step_const)
{
UInt64 start = assert_cast<const ColumnConst &>(*columns[0]).getUInt(0);
ok = executeConstStart<UInt8>(block, columns[1], columns[2], start, input_rows_count, result) ||
executeConstStart<UInt16>(block, columns[1], columns[2], start, input_rows_count, result) ||
executeConstStart<UInt32>(block, columns[1], columns[2], start, input_rows_count, result) ||
executeConstStart<UInt64>(block, columns[1], columns[2], start, input_rows_count, result);
}
else if (!is_start_const && is_step_const)
{
UInt64 step = assert_cast<const ColumnConst &>(*columns[2]).getUInt(0);
ok = executeConstStep<UInt8>(block, columns[0], columns[1], step, input_rows_count, result) ||
executeConstStep<UInt16>(block, columns[0], columns[1], step, input_rows_count, result) ||
executeConstStep<UInt32>(block, columns[0], columns[1], step, input_rows_count, result) ||
executeConstStep<UInt64>(block, columns[0], columns[1], step, input_rows_count, result);
}
else
{
ok = executeGeneric<UInt8>(block, columns[0], columns[1], columns[2], input_rows_count, result) ||
executeGeneric<UInt16>(block, columns[0], columns[1], columns[2], input_rows_count, result) ||
executeGeneric<UInt32>(block, columns[0], columns[1], columns[2], input_rows_count, result) ||
executeGeneric<UInt64>(block, columns[0], columns[1], columns[2], input_rows_count, result);
}
if (!ok)
{
throw Exception{"Illegal columns " + columns[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN};
}
}
};

View File

@ -138,15 +138,9 @@ public:
Int64 length_value = 0;
if (column_start_const)
{
start_value = column_start_const->getInt(0);
}
if (column_length_const)
{
length_value = column_length_const->getInt(0);
if (length_value < 0)
throw Exception("Third argument provided for function substring could not be negative.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if constexpr (is_utf8)
{

View File

@ -6,6 +6,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnString.h>
#include <Interpreters/Context.h>
#include <thread>
#include <memory>
@ -27,13 +28,18 @@ namespace ErrorCodes
/// Various illegal actions to test diagnostic features of ClickHouse itself. Should not be enabled in production builds.
class FunctionTrap : public IFunction
{
private:
const Context & context;
public:
static constexpr auto name = "trap";
static FunctionPtr create(const Context &)
static FunctionPtr create(const Context & context)
{
return std::make_shared<FunctionTrap>();
return std::make_shared<FunctionTrap>(context);
}
FunctionTrap(const Context & context_) : context(context_) {}
String getName() const override
{
return name;
@ -114,6 +120,10 @@ public:
t1.join();
t2.join();
}
else if (mode == "access context")
{
(void)context.getCurrentQueryId();
}
else
throw Exception("Unknown trap mode", ErrorCodes::BAD_ARGUMENTS);
}

View File

@ -26,6 +26,8 @@
#include <Common/assert_cast.h>
#include <common/demangle.h>
#include <common/config_common.h>
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h>
namespace ProfileEvents
@ -450,7 +452,7 @@ void NO_INLINE Aggregator::executeImplCase(
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
(*inst->funcs.add)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
}
}
@ -492,7 +494,10 @@ void NO_INLINE Aggregator::executeImplBatch(
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
inst->that->addBatch(rows, places.data(), inst->state_offset, inst->arguments, aggregates_pool);
{
(*inst->batch_funcs.add_batch)(
inst->batch_that, rows, places.data(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
}
}
@ -504,7 +509,13 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
{
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
inst->that->addBatchSinglePlace(rows, res + inst->state_offset, inst->arguments, arena);
{
if (inst->offsets)
(*inst->batch_funcs.add_batch_single_place)(
inst->batch_that, inst->offsets[static_cast<ssize_t>(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena);
else
(*inst->batch_funcs.add_batch_single_place)(inst->batch_that, rows, res + inst->state_offset, inst->batch_arguments, arena);
}
}
@ -564,6 +575,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
AggregateFunctionInstructions aggregate_functions_instructions(params.aggregates_size + 1);
aggregate_functions_instructions[params.aggregates_size].that = nullptr;
std::vector<std::vector<const IColumn *>> nested_columns_holder;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
@ -579,10 +591,31 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
}
}
aggregate_functions_instructions[i].that = aggregate_functions[i];
aggregate_functions_instructions[i].func = aggregate_functions[i]->getAddressOfAddFunction();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];
aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];
auto that = aggregate_functions[i];
/// Unnest consecutive trailing -State combinators
while (auto func = typeid_cast<const AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
aggregate_functions_instructions[i].that = that;
aggregate_functions_instructions[i].funcs = that->getAddressOfAddFunctions();
if (auto func = typeid_cast<const AggregateFunctionArray *>(that))
{
/// Unnest consecutive -State combinators before -Array
that = func->getNestedFunction().get();
while (auto nested_func = typeid_cast<const AggregateFunctionState *>(that))
that = nested_func->getNestedFunction().get();
auto [nested_columns, offsets] = checkAndGetNestedArrayOffset(aggregate_columns[i].data(), that->getArgumentTypes().size());
nested_columns_holder.push_back(std::move(nested_columns));
aggregate_functions_instructions[i].batch_arguments = nested_columns_holder.back().data();
aggregate_functions_instructions[i].offsets = offsets;
}
else
aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].batch_that = that;
aggregate_functions_instructions[i].batch_funcs = that->getAddressOfAddFunctions();
}
if (isCancelled())

View File

@ -1005,9 +1005,13 @@ protected:
struct AggregateFunctionInstruction
{
const IAggregateFunction * that;
IAggregateFunction::AddFunc func;
IAggregateFunction::AddFuncs funcs;
size_t state_offset;
const IColumn ** arguments;
const IAggregateFunction * batch_that;
IAggregateFunction::AddFuncs batch_funcs;
const IColumn ** batch_arguments;
const UInt64 * offsets = nullptr;
};
using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>;

View File

@ -4,6 +4,7 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/LazyBlockInputStream.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/checkStackSize.h>
@ -34,12 +35,14 @@ SelectStreamFactory::SelectStreamFactory(
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Scalars & scalars_,
bool has_virtual_shard_num_column_,
const Tables & external_tables_)
: header(header_),
processed_stage{processed_stage_},
main_table(std::move(main_table_)),
table_func_ptr{nullptr},
scalars{scalars_},
has_virtual_shard_num_column(has_virtual_shard_num_column_),
external_tables{external_tables_}
{
}
@ -49,11 +52,13 @@ SelectStreamFactory::SelectStreamFactory(
QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_,
const Scalars & scalars_,
bool has_virtual_shard_num_column_,
const Tables & external_tables_)
: header(header_),
processed_stage{processed_stage_},
table_func_ptr{table_func_ptr_},
scalars{scalars_},
has_virtual_shard_num_column(has_virtual_shard_num_column_),
external_tables{external_tables_}
{
}
@ -81,23 +86,38 @@ BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context &
return stream;
}
static String formattedAST(const ASTPtr & ast)
{
if (!ast)
return "";
std::stringstream ss;
formatAST(*ast, ss, false, true);
return ss.str();
}
}
void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const String &, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res)
{
auto modified_query_ast = query_ast->clone();
if (has_virtual_shard_num_column)
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32");
auto emplace_local_stream = [&]()
{
res.emplace_back(createLocalStream(query_ast, context, processed_stage));
res.emplace_back(createLocalStream(modified_query_ast, context, processed_stage));
};
String modified_query = formattedAST(modified_query_ast);
auto emplace_remote_stream = [&]()
{
auto stream = std::make_shared<RemoteBlockInputStream>(
shard_info.pool, query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
stream->setMainTable(main_table);
@ -194,7 +214,7 @@ void SelectStreamFactory::createForShard(
/// Do it lazily to avoid connecting in the main thread.
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
stage = processed_stage, local_delay]()
-> BlockInputStreamPtr
@ -229,7 +249,7 @@ void SelectStreamFactory::createForShard(
}
if (try_results.empty() || local_delay < max_remote_delay)
return createLocalStream(query_ast, context, stage);
return createLocalStream(modified_query_ast, context, stage);
else
{
std::vector<IConnectionPool::Entry> connections;
@ -238,7 +258,7 @@ void SelectStreamFactory::createForShard(
connections.emplace_back(std::move(try_result.entry));
return std::make_shared<RemoteBlockInputStream>(
std::move(connections), query, header, context, nullptr, throttler, scalars, external_tables, stage);
std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage);
}
};

View File

@ -19,6 +19,7 @@ public:
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Scalars & scalars_,
bool has_virtual_shard_num_column_,
const Tables & external_tables);
/// TableFunction in a query.
@ -27,6 +28,7 @@ public:
QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_,
const Scalars & scalars_,
bool has_virtual_shard_num_column_,
const Tables & external_tables_);
void createForShard(
@ -41,6 +43,7 @@ private:
QualifiedTableName main_table;
ASTPtr table_func_ptr;
Scalars scalars;
bool has_virtual_shard_num_column = false;
Tables external_tables;
};

View File

@ -1189,8 +1189,9 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
/// Check has column in (empty set).
String set_to_check;
for (auto & action : actions)
for (auto it = actions.rbegin(); it != actions.rend(); ++it)
{
auto & action = *it;
if (action.type == action.APPLY_FUNCTION && action.function_base)
{
auto name = action.function_base->getName();
@ -1199,6 +1200,7 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
&& action.argument_names.size() > 1)
{
set_to_check = action.argument_names[1];
break;
}
}
}
@ -1212,7 +1214,7 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
// Constant ColumnSet cannot be empty, so we only need to check non-constant ones.
if (auto * column_set = checkAndGetColumn<const ColumnSet>(action.added_column.get()))
{
if (column_set->getData()->getTotalRowCount() == 0)
if (column_set->getData()->isCreated() && column_set->getData()->getTotalRowCount() == 0)
return true;
}
}

View File

@ -249,6 +249,8 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
if (!set->insertFromBlock(block))
return;
}
set->finishInsert();
res.in->readSuffix();
prepared_sets[set_key] = std::move(set);

View File

@ -75,7 +75,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!mutation_commands.empty())
{
auto table_lock_holder = table->lockStructureForShare(false /* because mutation is executed asyncronously */, context.getCurrentQueryId());
MutationsInterpreter(table, mutation_commands, context).validate(table_lock_holder);
MutationsInterpreter(table, mutation_commands, context, false).validate(table_lock_holder);
table->mutate(mutation_commands, context);
}

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int READONLY;
extern const int ILLEGAL_COLUMN;
extern const int DUPLICATE_COLUMN;
}
@ -84,6 +85,8 @@ Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const
if (!allow_materialized && !table_sample_non_materialized.has(current_name))
throw Exception("Cannot insert column " + current_name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
if (res.has(current_name))
throw Exception("Column " + current_name + " specified more than once", ErrorCodes::DUPLICATE_COLUMN);
res.insert(ColumnWithTypeAndName(table_sample.getByName(current_name).type, current_name));
}

View File

@ -92,6 +92,7 @@
#include <Processors/Transforms/FinishSortingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataStreams/materializeBlock.h>
#include <Processors/Pipe.h>
namespace DB
@ -285,8 +286,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
if (is_table_func)
{
/// Read from table function.
storage = context.getQueryContext().executeTableFunction(table_expression);
/// Read from table function. propagate all settings from initSettings(),
/// alternative is to call on current `context`, but that can potentially pollute it.
storage = getSubqueryContext(context).executeTableFunction(table_expression);
}
else
{
@ -1034,7 +1036,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (options.only_analyze)
{
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<NullSource>(source_header)});
pipeline.init(Pipe(std::make_shared<NullSource>(source_header)));
else
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header));
@ -1075,7 +1077,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (prepared_input)
{
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<SourceFromInputStream>(prepared_input)});
pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(prepared_input)));
else
pipeline.streams.push_back(prepared_input);
}
@ -1401,7 +1403,7 @@ void InterpreterSelectQuery::executeFetchColumns(
auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<SourceFromInputStream>(istream)});
pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(istream)));
else
pipeline.streams.emplace_back(istream);
from_stage = QueryProcessingStage::WithMergeableState;
@ -1666,9 +1668,19 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.prewhere_info = prewhere_info;
query_info.sorting_info = sorting_info;
auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
BlockInputStreams streams;
Pipes pipes;
if (streams.empty())
/// Will work with pipes directly if storage support processors.
/// Code is temporarily copy-pasted while moving to new pipeline.
bool use_pipes = pipeline_with_processors && storage->supportProcessorsPipeline();
if (use_pipes)
pipes = storage->readWithProcessors(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
else
streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
if (streams.empty() && !use_pipes)
{
streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
@ -1691,9 +1703,36 @@ void InterpreterSelectQuery::executeFetchColumns(
}
}
/// Copy-paste from prev if.
if (pipes.empty() && use_pipes)
{
Pipe pipe(std::make_shared<NullSource>(storage->getSampleBlockForColumns(required_columns)));
if (query_info.prewhere_info)
{
pipe.addSimpleTransform(std::make_shared<FilterTransform>(
pipe.getHeader(),
prewhere_info->prewhere_actions,
prewhere_info->prewhere_column_name,
prewhere_info->remove_prewhere_column));
if (query_info.prewhere_info->remove_columns_actions)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), query_info.prewhere_info->remove_columns_actions));
}
pipes.emplace_back(std::move(pipe));
}
for (auto & stream : streams)
stream->addTableLock(table_lock);
if constexpr (pipeline_with_processors)
{
/// Table lock is stored inside pipeline here.
if (use_pipes)
pipeline.addTableLock(table_lock);
}
/// Set the limits and quota for reading data, the speed and time of the query.
{
IBlockInputStream::LocalLimits limits;
@ -1728,11 +1767,21 @@ void InterpreterSelectQuery::executeFetchColumns(
if (options.to_stage == QueryProcessingStage::Complete)
stream->setQuota(quota);
}
/// Copy-paste
for (auto & pipe : pipes)
{
if (!options.ignore_limits)
pipe.setLimits(limits);
if (options.to_stage == QueryProcessingStage::Complete)
pipe.setQuota(quota);
}
}
if constexpr (pipeline_with_processors)
{
if (streams.size() == 1)
if (streams.size() == 1 || pipes.size() == 1)
pipeline.setMaxThreads(streams.size());
/// Unify streams. They must have same headers.
@ -1744,9 +1793,8 @@ void InterpreterSelectQuery::executeFetchColumns(
if (first_header.columns() > 1 && first_header.has("_dummy"))
first_header.erase("_dummy");
for (size_t i = 0; i < streams.size(); ++i)
for (auto & stream : streams)
{
auto & stream = streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
@ -1754,12 +1802,6 @@ void InterpreterSelectQuery::executeFetchColumns(
}
}
Processors sources;
sources.reserve(streams.size());
/// Pin sources for merge tree tables.
bool pin_sources = dynamic_cast<const MergeTreeData *>(storage.get()) != nullptr;
for (auto & stream : streams)
{
bool force_add_agg_info = processing_stage == QueryProcessingStage::WithMergeableState;
@ -1768,13 +1810,18 @@ void InterpreterSelectQuery::executeFetchColumns(
if (processing_stage == QueryProcessingStage::Complete)
source->addTotalsPort();
if (pin_sources)
source->setStream(sources.size());
sources.emplace_back(std::move(source));
pipes.emplace_back(std::move(source));
}
pipeline.init(std::move(sources));
/// Pin sources for merge tree tables.
bool pin_sources = dynamic_cast<const MergeTreeData *>(storage.get()) != nullptr;
if (pin_sources)
{
for (size_t i = 0; i < pipes.size(); ++i)
pipes[i].pinSources(i);
}
pipeline.init(std::move(pipes));
}
else
pipeline.streams = std::move(streams);

View File

@ -14,6 +14,7 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Pipe.h>
namespace DB
@ -236,7 +237,7 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
}
if (!has_main_pipeline)
main_pipeline.init({ std::make_shared<NullSource>(getSampleBlock()) });
main_pipeline.init(Pipe(std::make_shared<NullSource>(getSampleBlock())));
if (!pipelines.empty())
{

View File

@ -89,10 +89,46 @@ std::optional<String> findFirstNonDeterministicFuncName(const MutationCommand &
return {};
}
ASTPtr prepareQueryAffectedAST(const std::vector<MutationCommand> & commands)
{
/// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query.
/// The result can differ from tne number of affected rows (e.g. if there is an UPDATE command that
/// changes how many rows satisfy the predicates of the subsequent commands).
/// But we can be sure that if count = 0, then no rows will be touched.
auto select = std::make_shared<ASTSelectQuery>();
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
auto count_func = std::make_shared<ASTFunction>();
count_func->name = "count";
count_func->arguments = std::make_shared<ASTExpressionList>();
select->select()->children.push_back(count_func);
if (commands.size() == 1)
select->setExpression(ASTSelectQuery::Expression::WHERE, commands[0].predicate->clone());
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "or";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
for (const MutationCommand & command : commands)
coalesced_predicates->arguments->children.push_back(command.predicate->clone());
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(coalesced_predicates));
}
return select;
}
};
bool MutationsInterpreter::isStorageTouchedByMutations() const
bool isStorageTouchedByMutations(
StoragePtr storage,
const std::vector<MutationCommand> & commands,
Context context_copy)
{
if (commands.empty())
return false;
@ -103,12 +139,16 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const
return true;
}
auto context_copy = context;
context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_copy.getSettingsRef().max_threads = 1;
const ASTPtr & select_query = prepareQueryAffectedAST();
BlockInputStreamPtr in = InterpreterSelectQuery(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits()).execute().in;
ASTPtr select_query = prepareQueryAffectedAST(commands);
/// Interpreter must be alive, when we use result of execute() method.
/// For some reason it may copy context and and give it into ExpressionBlockInputStream
/// after that we will use context from destroyed stack frame in our stream.
InterpreterSelectQuery interpreter(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits());
BlockInputStreamPtr in = interpreter.execute().in;
Block block = in->read();
if (!block.rows())
@ -119,8 +159,23 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const
auto count = (*block.getByName("count()").column)[0].get<UInt64>();
return count != 0;
}
MutationsInterpreter::MutationsInterpreter(
StoragePtr storage_,
std::vector<MutationCommand> commands_,
const Context & context_,
bool can_execute_)
: storage(std::move(storage_))
, commands(std::move(commands_))
, context(context_)
, can_execute(can_execute_)
{
mutation_ast = prepare(!can_execute);
auto limits = SelectQueryOptions().analyze(!can_execute).ignoreLimits();
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, limits);
}
static NameSet getKeyColumns(const StoragePtr & storage)
{
@ -520,19 +575,18 @@ void MutationsInterpreter::validate(TableStructureReadLockHolder &)
}
}
const auto & select_query = prepare(/* dry_run = */ true);
InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ true).ignoreLimits()};
/// Do not use getSampleBlock in order to check the whole pipeline.
Block first_stage_header = interpreter.execute().in->getHeader();
Block first_stage_header = select_interpreter->execute().in->getHeader();
BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header);
addStreamsForLaterStages(stages, in)->getHeader();
}
BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &)
{
const auto & select_query = prepare(/* dry_run = */ false);
InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};
BlockInputStreamPtr in = interpreter.execute().in;
if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
BlockInputStreamPtr in = select_interpreter->execute().in;
auto result_stream = addStreamsForLaterStages(stages, in);
if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader());
@ -544,46 +598,14 @@ const Block & MutationsInterpreter::getUpdatedHeader() const
return *updated_header;
}
ASTPtr MutationsInterpreter::prepareQueryAffectedAST() const
{
/// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query.
/// The result can differ from tne number of affected rows (e.g. if there is an UPDATE command that
/// changes how many rows satisfy the predicates of the subsequent commands).
/// But we can be sure that if count = 0, then no rows will be touched.
auto select = std::make_shared<ASTSelectQuery>();
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
auto count_func = std::make_shared<ASTFunction>();
count_func->name = "count";
count_func->arguments = std::make_shared<ASTExpressionList>();
select->select()->children.push_back(count_func);
if (commands.size() == 1)
select->setExpression(ASTSelectQuery::Expression::WHERE, commands[0].predicate->clone());
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "or";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
for (const MutationCommand & command : commands)
coalesced_predicates->arguments->children.push_back(command.predicate->clone());
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(coalesced_predicates));
}
return select;
}
size_t MutationsInterpreter::evaluateCommandsSize()
{
for (const MutationCommand & command : commands)
if (unlikely(!command.predicate)) /// The command touches all rows.
return prepare(/* dry_run = */ true)->size();
return mutation_ast->size();
return std::max(prepareQueryAffectedAST()->size(), prepare(/* dry_run = */ true)->size());
return std::max(prepareQueryAffectedAST(commands)->size(), mutation_ast->size());
}
}

View File

@ -13,25 +13,22 @@ namespace DB
class Context;
/// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations(StoragePtr storage, const std::vector<MutationCommand> & commands, Context context_copy);
/// Create an input stream that will read data from storage and apply mutation commands (UPDATEs, DELETEs, MATERIALIZEs)
/// to this data.
class MutationsInterpreter
{
public:
MutationsInterpreter(StoragePtr storage_, std::vector<MutationCommand> commands_, const Context & context_)
: storage(std::move(storage_))
, commands(std::move(commands_))
, context(context_)
{
}
/// Storage to mutate, array of mutations commands and context. If you really want to execute mutation
/// use can_execute = true, in other cases (validation, amount of commands) it can be false
MutationsInterpreter(StoragePtr storage_, std::vector<MutationCommand> commands_, const Context & context_, bool can_execute_);
void validate(TableStructureReadLockHolder & table_lock_holder);
size_t evaluateCommandsSize();
/// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations() const;
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
BlockInputStreamPtr execute(TableStructureReadLockHolder & table_lock_holder);
@ -43,13 +40,19 @@ private:
struct Stage;
ASTPtr prepareQueryAffectedAST() const;
ASTPtr prepareInterpreterSelectQuery(std::vector<Stage> &prepared_stages, bool dry_run);
BlockInputStreamPtr addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const;
StoragePtr storage;
std::vector<MutationCommand> commands;
const Context & context;
bool can_execute;
ASTPtr mutation_ast;
/// We have to store interpreter because it use own copy of context
/// and some streams from execute method may use it.
std::unique_ptr<InterpreterSelectQuery> select_interpreter;
/// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several
/// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the

View File

@ -293,6 +293,7 @@ void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & co
Block block = header.cloneWithColumns(std::move(columns));
insertFromBlock(block);
finishInsert();
}

View File

@ -56,6 +56,10 @@ public:
/// Returns false, if some limit was exceeded and no need to insert more data.
bool insertFromBlock(const Block & block);
/// Call after all blocks were inserted. To get the information that set is already created.
void finishInsert() { is_created = true; }
bool isCreated() const { return is_created; }
/** For columns of 'block', check belonging of corresponding rows to the set.
* Return UInt8 column with the result.
@ -111,6 +115,9 @@ private:
/// Do we need to additionally store all elements of the set in explicit form for subsequent use for index.
bool fill_set_elements;
/// Check if set contains all the data.
bool is_created = false;
/// If in the left part columns contains the same types as the elements of the set.
void executeOrdinary(
const ColumnRawPtrs & key_columns,

View File

@ -32,7 +32,7 @@ std::shared_ptr<TSystemLog> createSystemLog(
String database = config.getString(config_prefix + ".database", default_database_name);
String table = config.getString(config_prefix + ".table", default_table_name);
String partition_by = config.getString(config_prefix + ".partition_by", "toYYYYMM(event_date)");
String engine = "ENGINE = MergeTree PARTITION BY (" + partition_by + ") ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024";
String engine = "ENGINE = MergeTree PARTITION BY (" + partition_by + ") ORDER BY (event_date, event_time)";
size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds", DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS);

View File

@ -28,7 +28,7 @@ void ASTOrderByElement::formatImpl(const FormatSettings & settings, FormatState
if (with_fill)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH FILL " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH FILL" << (settings.hilite ? hilite_none : "");
if (fill_from)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM " << (settings.hilite ? hilite_none : "");

View File

@ -44,8 +44,8 @@ static void checkSource(const IProcessor & source)
throw Exception("Source for pipe should have single output, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
if (source.getOutputs().size() != 1)
throw Exception("Source for pipe should have single output, but " + source.getName() + " has " +
if (source.getOutputs().size() > 2)
throw Exception("Source for pipe should have single or two outputs, but " + source.getName() + " has " +
toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
@ -54,6 +54,10 @@ Pipe::Pipe(ProcessorPtr source)
{
checkSource(*source);
output_port = &source->getOutputs().front();
if (source->getOutputs().size() > 1)
totals = &source->getOutputs().back();
processors.emplace_back(std::move(source));
}
@ -84,4 +88,31 @@ void Pipe::addSimpleTransform(ProcessorPtr transform)
processors.emplace_back(std::move(transform));
}
void Pipe::setLimits(const ISourceWithProgress::LocalLimits & limits)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setLimits(limits);
}
}
void Pipe::setQuota(QuotaForIntervals & quota)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setQuota(quota);
}
}
void Pipe::pinSources(size_t executor_number)
{
for (auto & processor : processors)
{
if (auto * source = dynamic_cast<ISource *>(processor.get()))
source->setStream(executor_number);
}
}
}

View File

@ -1,4 +1,6 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
@ -6,6 +8,8 @@ namespace DB
class Pipe;
using Pipes = std::vector<Pipe>;
class QuotaForIntervals;
/// Pipe is a set of processors which represents the part of pipeline with single output.
/// All processors in pipe are connected. All ports are connected except the output one.
class Pipe
@ -33,9 +37,20 @@ public:
Processors detachProcessors() && { return std::move(processors); }
/// Specify quotas and limits for every ISourceWithProgress.
void setLimits(const SourceWithProgress::LocalLimits & limits);
void setQuota(QuotaForIntervals & quota);
/// Set information about preferred executor number for sources.
void pinSources(size_t executor_number);
void setTotalsPort(OutputPort * totals_) { totals = totals_; }
OutputPort * getTotalsPort() const { return totals; }
private:
Processors processors;
OutputPort * output_port = nullptr;
OutputPort * totals = nullptr;
};
}

View File

@ -14,7 +14,6 @@
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Common/typeid_cast.h>
@ -48,36 +47,41 @@ void QueryPipeline::checkSource(const ProcessorPtr & source, bool can_have_total
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipeline::init(Processors sources)
void QueryPipeline::init(Pipe pipe)
{
Pipes pipes;
pipes.emplace_back(std::move(pipe));
init(std::move(pipes));
}
void QueryPipeline::init(Pipes pipes)
{
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
if (sources.empty())
throw Exception("Can't initialize pipeline with empty source list.", ErrorCodes::LOGICAL_ERROR);
if (pipes.empty())
throw Exception("Can't initialize pipeline with empty pipes list.", ErrorCodes::LOGICAL_ERROR);
std::vector<OutputPort *> totals;
for (auto & source : sources)
for (auto & pipe : pipes)
{
checkSource(source, true);
auto & header = source->getOutputs().front().getHeader();
auto & header = pipe.getHeader();
if (current_header)
assertBlocksHaveEqualStructure(current_header, header, "QueryPipeline");
else
current_header = header;
if (source->getOutputs().size() > 1)
if (auto * totals_port = pipe.getTotalsPort())
{
assertBlocksHaveEqualStructure(current_header, source->getOutputs().back().getHeader(), "QueryPipeline");
totals.emplace_back(&source->getOutputs().back());
assertBlocksHaveEqualStructure(current_header, totals_port->getHeader(), "QueryPipeline");
totals.emplace_back(totals_port);
}
/// source->setStream(streams.size());
streams.emplace_back(&source->getOutputs().front());
processors.emplace_back(std::move(source));
streams.emplace_back(&pipe.getPort());
auto cur_processors = std::move(pipe).detachProcessors();
processors.insert(processors.end(), cur_processors.begin(), cur_processors.end());
}
if (!totals.empty())

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Pipe.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
@ -11,7 +12,7 @@ namespace DB
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
using TableStructureReadLocks = std::vector<TableStructureReadLockHolder>;
class Context;
@ -22,8 +23,9 @@ class QueryPipeline
public:
QueryPipeline() = default;
/// Each source must have single output port and no inputs. All outputs must have same header.
void init(Processors sources);
/// All pipes must have same header.
void init(Pipes pipes);
void init(Pipe pipe); /// Simple init for single pipe
bool initialized() { return !processors.empty(); }
enum class StreamType
@ -72,7 +74,7 @@ public:
const Block & getHeader() const { return current_header; }
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
/// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback);

View File

@ -114,10 +114,13 @@ Chunk SourceFromInputStream::generate()
stream->readSuffix();
if (auto totals_block = stream->getTotals())
{
if (totals_block.rows() == 1) /// Sometimes we can get empty totals. Skip it.
{
totals.setColumns(totals_block.getColumns(), 1);
has_totals = true;
}
}
is_stream_finished = true;
return {};

View File

@ -141,6 +141,9 @@ void CreatingSetsTransform::work()
auto finishCurrentSubquery = [&]()
{
if (subquery.set)
subquery.set->finishInsert();
if (table_out)
table_out->writeSuffix();

View File

@ -64,7 +64,8 @@ FilterTransform::FilterTransform(
IProcessor::Status FilterTransform::prepare()
{
if (constant_filter_description.always_false)
if (constant_filter_description.always_false
|| expression->checkColumnIsAlwaysFalse(filter_column_name))
{
input.close();
output.finish();
@ -83,18 +84,6 @@ void FilterTransform::removeFilterIfNeed(Chunk & chunk)
void FilterTransform::transform(Chunk & chunk)
{
if (!initialized)
{
initialized = true;
/// Cannot check this in prepare. Because in prepare columns for set may be not created yet.
if (expression->checkColumnIsAlwaysFalse(filter_column_name))
{
stopReading();
chunk = Chunk(getOutputPort().getHeader().getColumns(), 0);
return;
}
}
size_t num_rows_before_filtration = chunk.getNumRows();
auto columns = chunk.detachColumns();

View File

@ -36,8 +36,6 @@ private:
/// Header after expression, but before removing filter column.
Block transformed_header;
bool initialized = false;
void removeFilterIfNeed(Chunk & chunk);
};

View File

@ -268,6 +268,8 @@ public:
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual bool supportProcessorsPipeline() const { return false; }
/** Writes the data to a table.
* Receives a description of the query, which can contain information about the data write method.
* Returns an object by which you can write data sequentially.

View File

@ -943,9 +943,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
command.partition, context_for_reading);
});
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading);
if (!mutations_interpreter.isStorageTouchedByMutations())
if (!isStorageTouchedByMutations(storage_from_source_part, commands_for_part, context_for_reading))
{
LOG_TRACE(log, "Part " << source_part->name << " doesn't change up to mutation version " << future_part.part_info.mutation);
return data.cloneAndLoadDataPartOnSameDisk(source_part, "tmp_clone_", future_part.part_info);
@ -973,6 +972,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
Poco::File(new_part_tmp_path).createDirectories();
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true);
auto in = mutations_interpreter.execute(table_lock_holder);
const auto & updated_header = mutations_interpreter.getUpdatedHeader();

View File

@ -216,7 +216,10 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & relative_data_path_,
bool attach_)
: table_name(table_name_), database_name(database_name_),
: IStorage(ColumnsDescription({
{"_shard_num", std::make_shared<DataTypeUInt32>()},
}, true)),
table_name(table_name_), database_name(database_name_),
remote_database(remote_database_), remote_table(remote_table_),
global_context(context_), cluster_name(global_context.getMacros()->expand(cluster_name_)), has_sharding_key(sharding_key_),
path(relative_data_path_.empty() ? "" : (context_.getPath() + relative_data_path_))
@ -305,7 +308,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
}
BlockInputStreams StorageDistributed::read(
const Names & /*column_names*/,
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
@ -324,11 +327,15 @@ BlockInputStreams StorageDistributed::read(
const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{};
bool has_virtual_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num"))
has_virtual_shard_num_column = false;
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
? ClusterProxy::SelectStreamFactory(
header, processed_stage, remote_table_function_ptr, scalars, context.getExternalTables())
header, processed_stage, remote_table_function_ptr, scalars, has_virtual_shard_num_column, context.getExternalTables())
: ClusterProxy::SelectStreamFactory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, scalars, context.getExternalTables());
header, processed_stage, QualifiedTableName{remote_database, remote_table}, scalars, has_virtual_shard_num_column, context.getExternalTables());
if (settings.optimize_skip_unused_shards)
{

View File

@ -55,6 +55,7 @@ private:
HashJoinPtr join;
void insertBlock(const Block & block) override;
void finishInsert() override {}
size_t getSize() const override;
protected:

View File

@ -690,7 +690,7 @@ bool StorageMergeTree::tryMutatePart()
size_t current_ast_elements = 0;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context);
MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context, false);
size_t commands_size = interpreter.evaluateCommandsSize();
if (current_ast_elements + commands_size >= max_ast_elements)

View File

@ -45,6 +45,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;

View File

@ -97,6 +97,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;

View File

@ -70,6 +70,7 @@ void SetOrJoinBlockOutputStream::write(const Block & block)
void SetOrJoinBlockOutputStream::writeSuffix()
{
table.finishInsert();
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
@ -126,6 +127,7 @@ StorageSet::StorageSet(
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block); }
void StorageSet::finishInsert() { set->finishInsert(); }
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
@ -183,8 +185,11 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
NativeBlockInputStream backup_stream(compressed_backup_buf, 0);
backup_stream.readPrefix();
while (Block block = backup_stream.read())
insertBlock(block);
finishInsert();
backup_stream.readSuffix();
/// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.

View File

@ -58,6 +58,8 @@ private:
/// Insert the block into the state.
virtual void insertBlock(const Block & block) = 0;
/// Call after all blocks were inserted.
virtual void finishInsert() = 0;
virtual size_t getSize() const = 0;
};
@ -83,6 +85,7 @@ private:
SetPtr set;
void insertBlock(const Block & block) override;
void finishInsert() override;
size_t getSize() const override;
protected:

View File

@ -105,7 +105,7 @@ namespace
template <typename BridgeHelperMixin>
void registerXDBCStorage(StorageFactory & factory, const std::string & name)
{
factory.registerStorage(name, [&name](const StorageFactory::Arguments & args)
factory.registerStorage(name, [name](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;

View File

@ -11,6 +11,7 @@ const char * auto_contributors[] {
"Alex Ryndin",
"Alex Zatelepin",
"Alexander Avdonkin",
"Alexander Burmak",
"Alexander Ermolaev",
"Alexander GQ Gerasiov",
"Alexander Kazakov",
@ -49,6 +50,7 @@ const char * auto_contributors[] {
"Andrew Grigorev",
"Andrey",
"Andrey Dudin",
"Andrey Konyaev",
"Andrey M",
"Andrey Mironov",
"Andrey Urusov",
@ -86,6 +88,7 @@ const char * auto_contributors[] {
"Chen Yufei",
"Ciprian Hacman",
"Clément Rodriguez",
"Colum",
"Constantin S. Pan",
"CurtizJ",
"Daniel Bershatsky",
@ -95,6 +98,7 @@ const char * auto_contributors[] {
"DarkWanderer",
"Darío",
"Denis Burlaka",
"Denis Glazachev",
"Denis Zhuravlev",
"Derek Perkins",
"Dmitry Bilunov",
@ -110,6 +114,7 @@ const char * auto_contributors[] {
"Elghazal Ahmed",
"Emmanuel Donin de Rosière",
"Eric",
"Ernest Poletaev",
"Eugene Klimov",
"Eugene Konkov",
"Evgenii Pravda",
@ -145,6 +150,7 @@ const char * auto_contributors[] {
"Ilya",
"Ilya Breev",
"Ilya Khomutov",
"Ilya Korol",
"Ilya Korolev",
"Ilya Kovalenko",
"Ilya Shipitsin",
@ -341,6 +347,7 @@ const char * auto_contributors[] {
"Zhichang Yu",
"abdrakhmanov",
"abyss7",
"achimbab",
"achulkov2",
"akazz",
"akonyaev",
@ -350,6 +357,7 @@ const char * auto_contributors[] {
"alex.lvxin",
"alexander kozhikhov",
"alexey-milovidov",
"andrei-karpliuk",
"andrewsg",
"anrodigina",
"anton",
@ -411,6 +419,7 @@ const char * auto_contributors[] {
"levysh",
"liangqian",
"linceyou",
"liu-bov",
"liuyangkuan",
"liuyimin",
"lomberts",
@ -419,6 +428,7 @@ const char * auto_contributors[] {
"malkfilipp",
"maqroll",
"maxkuzn",
"memo",
"mf5137",
"mfridental",
"miha-g",
@ -439,6 +449,7 @@ const char * auto_contributors[] {
"pyos",
"qianlixiang",
"quid",
"rainbowsysu",
"robot-clickhouse",
"robot-metrika-test",
"root",

View File

@ -61,13 +61,7 @@ ASTPtr buildWhereExpression(const ASTs & functions)
return nullptr;
if (functions.size() == 1)
return functions[0];
ASTPtr new_query = std::make_shared<ASTFunction>();
auto & new_function = new_query->as<ASTFunction &>();
new_function.name = "and";
new_function.arguments = std::make_shared<ASTExpressionList>();
new_function.arguments->children = functions;
new_function.children.push_back(new_function.arguments);
return new_query;
return makeASTFunction("and", functions);
}
}
@ -87,6 +81,32 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
select.with()->children.push_back(literal);
}
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value, const String & func)
{
auto & select = ast->as<ASTSelectQuery &>();
if (!select.with())
select.setExpression(ASTSelectQuery::Expression::WITH, std::make_shared<ASTExpressionList>());
if (func.empty())
{
auto literal = std::make_shared<ASTLiteral>(value);
literal->alias = column_name;
literal->prefer_alias_to_column_name = true;
select.with()->children.push_back(literal);
}
else
{
auto literal = std::make_shared<ASTLiteral>(value);
literal->prefer_alias_to_column_name = true;
auto function = makeASTFunction(func, literal);
function->alias = column_name;
function->prefer_alias_to_column_name = true;
select.with()->children.push_back(function);
}
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context)
{
const auto & select = query->as<ASTSelectQuery &>();

View File

@ -16,9 +16,13 @@ class NamesAndTypesList;
namespace VirtualColumnUtils
{
/// Adds to the select query section `select column_name as value`
/// For example select _port as 9000.
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value);
/// Adds to the select query section `WITH value AS column_name`, and uses func
/// to wrap the value (if any)
///
/// For example:
/// - `WITH 9000 as _port`.
/// - `WITH toUInt16(9000) as _port`.
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value, const String & func = "");
/// Leave in the block only the rows that fit under the WHERE clause and the PREWHERE clause of the query.
/// Only elements of the outer conjunction are considered, depending only on the columns present in the block.

View File

@ -16,6 +16,7 @@
#include <Poco/Net/HTTPRequest.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Poco/NumberFormatter.h>
namespace DB
@ -70,6 +71,10 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
columns_info_uri.addQueryParameter("schema", schema_name);
columns_info_uri.addQueryParameter("table", remote_table_name);
const auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
columns_info_uri.addQueryParameter("external_table_functions_use_nulls",
Poco::NumberFormatter::format(use_nulls));
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, nullptr);
std::string columns_info;

View File

@ -502,13 +502,13 @@ if __name__ == '__main__':
parser.add_argument('--no-stateful', action='store_true', help='Disable all stateful tests')
parser.add_argument('--skip', nargs='+', help="Skip these tests")
parser.add_argument('--no-long', action='store_false', dest='no_long', help='Do not run long tests')
parser.add_argument('--client-option', nargs='+', help='Specify additional client argument')
group=parser.add_mutually_exclusive_group(required=False)
group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests')
group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests')
group=parser.add_mutually_exclusive_group(required=False)
group.add_argument('--shard', action='store_true', default=None, dest='shard', help='Run sharding related tests (required to clickhouse-server listen 127.0.0.2 127.0.0.3)')
group.add_argument('--no-shard', action='store_false', default=None, dest='shard', help='Do not run shard related tests')
group.add_argument('--client-option', nargs='+', help='Specify additional client argument')
args = parser.parse_args()

View File

@ -4,6 +4,9 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from helpers.network import PartitionManager
from multiprocessing.dummy import Pool
import random
"""
Both ssl_conf.xml and no_ssl_conf.xml have the same port
@ -46,6 +49,35 @@ def test_both_https(both_https_cluster):
assert_eq_with_retry(node1, "SELECT id FROM test_table order by id", '111\n222')
assert_eq_with_retry(node2, "SELECT id FROM test_table order by id", '111\n222')
def test_replication_after_partition(both_https_cluster):
node1.query("truncate table test_table")
node2.query("truncate table test_table")
manager = PartitionManager()
def close(num):
manager.partition_instances(node1, node2, port=9010)
time.sleep(1)
manager.heal_all()
def insert_data_and_check(num):
node1.query("insert into test_table values('2019-10-15', {}, 888)".format(num))
time.sleep(0.5)
closing_pool = Pool(1)
inserting_pool = Pool(5)
cres = closing_pool.map_async(close, [random.randint(1, 3) for _ in range(10)])
ires = inserting_pool.map_async(insert_data_and_check, range(100))
cres.wait()
ires.wait()
assert_eq_with_retry(node1, "SELECT count() FROM test_table", '100')
assert_eq_with_retry(node2, "SELECT count() FROM test_table", '100')
node3 = cluster.add_instance('node3', config_dir="configs", main_configs=['configs/remote_servers.xml', 'configs/no_ssl_conf.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', config_dir="configs", main_configs=['configs/remote_servers.xml', 'configs/no_ssl_conf.xml'], with_zookeeper=True)

View File

@ -91,7 +91,8 @@ def test_mysql_simple_select_works(started_cluster):
with conn.cursor() as cursor:
cursor.execute("INSERT INTO clickhouse.{} VALUES(50, 'null-guy', 127, 255, NULL), (100, 'non-null-guy', 127, 255, 511);".format(table_name))
conn.commit()
assert node1.query("SELECT column_x FROM odbc('DSN={}', '{}')".format(mysql_setup["DSN"], table_name)) == '\\N\n511\n'
assert node1.query("SELECT column_x FROM odbc('DSN={}', '{}') SETTINGS external_table_functions_use_nulls=1".format(mysql_setup["DSN"], table_name)) == '\\N\n511\n'
assert node1.query("SELECT column_x FROM odbc('DSN={}', '{}') SETTINGS external_table_functions_use_nulls=0".format(mysql_setup["DSN"], table_name)) == '0\n511\n'
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, column_x Nullable(UInt32)) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse');

View File

@ -0,0 +1,21 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server:19.1.14', with_installed_binary=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_different_versions(start_cluster):
assert node1.query("SELECT uniqExact(x) FROM (SELECT version() as x from remote('node{1,2}', system.one))") == "2\n"

View File

@ -0,0 +1,17 @@
<test>
<type>once</type>
<stop_conditions>
<any_of>
<average_speed_not_changing_for_ms>5000</average_speed_not_changing_for_ms>
<total_time_ms>10000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<max_rows_per_second />
</main_metric>
<query>SELECT count() FROM (SELECT range(number % 100) FROM system.numbers limit 10000000)</query>
<query>SELECT count() FROM (SELECT range(0, number % 100, 1) FROM system.numbers limit 10000000)</query>
</test>

View File

@ -0,0 +1,34 @@
<test>
<type>loop</type>
<stop_conditions>
<any_of>
<iterations>10</iterations>
</any_of>
</stop_conditions>
<main_metric>
<rows_per_second />
</main_metric>
<preconditions>
<table_exists>default.hits_10m_single</table_exists>
</preconditions>
<create_query>CREATE TABLE hits_10m_words (word String, UserID UInt64) ENGINE Memory</create_query>
<create_query>CREATE TABLE strings (short String, long String) ENGINE Memory</create_query>
<fill_query> INSERT INTO hits_10m_words SELECT DISTINCT arrayJoin(splitByString(' ', SearchPhrase)) AS word, UserID FROM hits_10m_single WHERE length(word) > 0</fill_query>
<fill_query> INSERT INTO strings SELECT toString(rand()) a, a || a || a || a || a || a || a || a || a || a || a || a FROM numbers(1000000)</fill_query>
<settings>
<max_threads>1</max_threads>
</settings>
<query>SELECT 1 FROM hits_10m_words AS l ANY LEFT JOIN hits_10m_words AS r USING (word) FORMAT Null</query>
<query>SELECT 1 FROM strings AS l ANY LEFT JOIN strings AS r USING (short) FORMAT Null</query>
<query>SELECT 1 FROM strings AS l ANY LEFT JOIN strings AS r USING (long) FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS hits_10m_words</drop_query>
<drop_query>DROP TABLE IF EXISTS strings</drop_query>
</test>

View File

@ -0,0 +1,35 @@
<test>
<type>loop</type>
<stop_conditions>
<all_of>
<total_time_ms>30000</total_time_ms>
</all_of>
<any_of>
<average_speed_not_changing_for_ms>6000</average_speed_not_changing_for_ms>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<min_time/>
</main_metric>
<settings>
<max_threads>1</max_threads>
</settings>
<create_query>CREATE TABLE array_data(k UInt16, v Array(UInt64)) ENGINE Log</create_query>
<fill_query>INSERT INTO array_data SELECT number % 1024, arrayWithConstant(16, number) from numbers(10000000)</fill_query>
<query>SELECT countMerge(v) FROM (SELECT countState() v FROM numbers(1000000000)) FORMAT Null</query>
<query>SELECT countMerge(v) FROM (SELECT number % 1024 k, countState() v FROM numbers(1000000000) GROUP BY k) FORMAT Null</query>
<query>SELECT sumArray(v) FROM array_data FORMAT Null</query>
<query>SELECT k, sumArray(v) FROM array_data GROUP BY k FORMAT Null</query>
<query>SELECT arrayReduce('avg', v) FROM array_data FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS array_data</drop_query>
</test>

View File

@ -1,3 +1,6 @@
SELECT range(100) == range(0, 100) and range(0, 100) == range(0, 100, 1);
SELECT distinct length(range(number, number + 100, 99)) == 2 FROM numbers(1000);
SELECT distinct length(range(number, number + 100, 100)) == 1 FROM numbers(1000);
SELECT range(0)[-1];
SELECT range(0)[1];
SELECT range(number)[2] FROM system.numbers LIMIT 10;

View File

@ -1,4 +1,7 @@
96354
-676697544
138768
-2143570108
2145564783
96354
1470786104

View File

@ -1,4 +1,7 @@
select javaHash('abc');
select javaHash('874293087');
select javaHashUTF16LE(convertCharset('a1가', 'utf-8', 'utf-16le'));
select javaHashUTF16LE(convertCharset('가나다라마바사아자차카타파하', 'utf-8', 'utf-16le'));
select javaHashUTF16LE(convertCharset('FJKLDSJFIOLD_389159837589429', 'utf-8', 'utf-16le'));
select hiveHash('abc');
select hiveHash('874293087');

View File

@ -1,4 +1,4 @@
SELECT substring('hello', []); -- { serverError 43 }
SELECT substring('hello', 1, []); -- { serverError 43 }
SELECT substring(materialize('hello'), -1, -1); -- { serverError 69 }
SELECT substring(materialize('hello'), -1, -1);
SELECT substring(materialize('hello'), 0); -- { serverError 135 }

View File

@ -0,0 +1,11 @@
64000000
64000000
42020000
B61BFEFFFFFFFFFF
EF260000000000000000000000000000
400D0300
28110300
403A340100000000
E0C0350100000000
00B08EF01B0000000000000000000000
007A292C1C0000000000000000000000

View File

@ -0,0 +1,8 @@
SELECT hex(toDecimal32(1.0, 2));
SELECT hex(toDecimal32(1., 2));
SELECT hex(toDecimal32(0.000578, 6));
SELECT hex(toDecimal64(-123.978, 3));
SELECT hex(toDecimal128(99.67, 2));
SELECT hex(toDecimal32(number, 3)) FROM numbers(200, 2);
SELECT hex(toDecimal64(number, 5)) FROM numbers(202, 2);
SELECT hex(toDecimal128(number, 9)) FROM numbers(120, 2);

View File

@ -0,0 +1,36 @@
remote(system.one)
0
0
0
1 0
1 0
2 0
1 0
dist_1
1
1 10
10
1
1
1 10
1 20
10
20
dist_2
1
2
1 100
2 100
100
100
remote(Distributed)
1 100
1 100
JOIN system.clusters
1 10 localhost ::1 9000
1 20 localhost ::1 9000
1 10 localhost ::1 9000
1 20 localhost ::1 9000
dist_3
100 foo
foo 100 foo

View File

@ -0,0 +1,71 @@
-- make the order static
SET max_threads = 1;
-- remote(system.one)
SELECT 'remote(system.one)';
SELECT * FROM remote('127.0.0.1', system.one);
SELECT * FROM remote('127.0.0.{1,2}', system.one);
SELECT _shard_num, * FROM remote('127.0.0.1', system.one);
SELECT _shard_num, * FROM remote('127.0.0.{1,2}', system.one);
SELECT _shard_num, * FROM remote('127.0.0.{1,2}', system.one) WHERE _shard_num = 1;
-- dist_1 using test_shard_localhost
SELECT 'dist_1';
CREATE TABLE mem1 (key Int) Engine=Memory();
CREATE TABLE dist_1 AS mem1 Engine=Distributed(test_shard_localhost, currentDatabase(), mem1);
SELECT _shard_num FROM dist_1;
INSERT INTO mem1 VALUES (10);
SELECT _shard_num FROM dist_1;
SELECT _shard_num, key FROM dist_1;
SELECT key FROM dist_1;
INSERT INTO dist_1 VALUES (20);
SELECT _shard_num FROM dist_1;
SELECT _shard_num, key FROM dist_1;
SELECT key FROM dist_1;
-- dist_2 using test_cluster_two_shards_localhost
SELECT 'dist_2';
CREATE TABLE mem2 (key Int) Engine=Memory();
CREATE TABLE dist_2 AS mem2 Engine=Distributed(test_cluster_two_shards_localhost, currentDatabase(), mem2);
SELECT _shard_num FROM dist_2;
INSERT INTO mem2 VALUES (100);
SELECT _shard_num FROM dist_2;
SELECT _shard_num, key FROM dist_2;
SELECT key FROM dist_2;
-- multiple _shard_num
SELECT 'remote(Distributed)';
SELECT _shard_num, key FROM remote('127.0.0.1', currentDatabase(), dist_2);
-- JOIN system.clusters
SELECT 'JOIN system.clusters';
SELECT a._shard_num, a.key, b.host_name, b.host_address, b.port
FROM (SELECT *, _shard_num FROM dist_1) a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost';
SELECT _shard_num, key, b.host_name, b.host_address, b.port
FROM dist_1 a
JOIN system.clusters b
ON _shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost';
-- rewrite does not work with aliases, hence Missing columns (47)
SELECT a._shard_num, key FROM dist_1 a; -- { serverError 47; }
-- the same with JOIN, just in case
SELECT a._shard_num, a.key, b.host_name, b.host_address, b.port
FROM dist_1 a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 47; }
SELECT 'dist_3';
CREATE TABLE mem3 (key Int, _shard_num String) Engine=Memory();
CREATE TABLE dist_3 AS mem3 Engine=Distributed(test_shard_localhost, currentDatabase(), mem3);
INSERT INTO mem3 VALUES (100, 'foo');
SELECT * FROM dist_3;
SELECT _shard_num, * FROM dist_3;

View File

@ -0,0 +1,28 @@
-- Create dictionary, since dictGet*() uses DB::Context in executeImpl()
-- (To cover scope of the Context in DB::PushingToViewsBlockOutputStream::process)
DROP DATABASE IF EXISTS dict_in_01023;
CREATE DATABASE dict_in_01023;
CREATE TABLE dict_in_01023.input (key UInt64, val UInt64) Engine=Memory();
CREATE DICTIONARY dict_in_01023.dict
(
key UInt64 DEFAULT 0,
val UInt64 DEFAULT 1
)
PRIMARY KEY key
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'input' PASSWORD '' DB 'dict_in_01023'))
LIFETIME(MIN 0 MAX 0)
LAYOUT(HASHED());
CREATE TABLE input (key UInt64) Engine=Distributed(test_shard_localhost, currentDatabase(), buffer_, key);
CREATE TABLE null_ (key UInt64) Engine=Null();
CREATE TABLE buffer_ (key UInt64) Engine=Buffer(currentDatabase(), dist_out, 1, 0, 0, 0, 0, 0, 0);
CREATE TABLE dist_out (key UInt64) Engine=Distributed(test_shard_localhost, currentDatabase(), null_, key);
CREATE TABLE output (key UInt64, val UInt64) Engine=Memory();
CREATE MATERIALIZED VIEW mv TO output AS SELECT key, dictGetUInt64('dict_in_01023.dict', 'val', key) val FROM dist_out;
INSERT INTO input VALUES (1);
SELECT count() FROM output;

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
. $CURDIR/mergetree_mutations.lib
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS json_test"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE json_test (id UInt32, metadata String) ENGINE = MergeTree() ORDER BY id"
${CLICKHOUSE_CLIENT} --query="INSERT INTO json_test VALUES (1, '{\"date\": \"2018-01-01\", \"task_id\": \"billing_history__billing_history.load_history_payments_data__20180101\"}'), (2, '{\"date\": \"2018-01-02\", \"task_id\": \"billing_history__billing_history.load_history_payments_data__20180101\"}')"
${CLICKHOUSE_CLIENT} --query="SELECT COUNT() FROM json_test"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE json_test DELETE WHERE JSONExtractString(metadata, 'date') = '2018-01-01'"
wait_for_mutation "json_test" "mutation_2.txt"
${CLICKHOUSE_CLIENT} --query="SELECT COUNT() FROM json_test"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS json_test"

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS sometable;
CREATE TABLE sometable (
date Date,
time Int64,
value UInt64
) ENGINE=MergeTree()
ORDER BY time;
INSERT INTO sometable (date, time, value) VALUES ('2019-11-08', 1573185600, 100);
SELECT COUNT() from sometable;
INSERT INTO sometable (date, time, value, time) VALUES ('2019-11-08', 1573185600, 100, 1573185600); -- {serverError 15}
DROP TABLE IF EXISTS sometable;

View File

@ -0,0 +1 @@
CREATE TABLE default.BannerDict (`BannerID` UInt64, `CompaignID` UInt64) ENGINE = ODBC(\'DSN=pgconn;Database=postgres\', somedb, bannerdict)

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS BannerDict;
CREATE TABLE BannerDict (`BannerID` UInt64, `CompaignID` UInt64) ENGINE = ODBC('DSN=pgconn;Database=postgres', bannerdict); -- {serverError 42}
CREATE TABLE BannerDict (`BannerID` UInt64, `CompaignID` UInt64) ENGINE = ODBC('DSN=pgconn;Database=postgres', somedb, bannerdict);
SHOW CREATE TABLE BannerDict;
DROP TABLE IF EXISTS BannerDict;

View File

@ -0,0 +1,8 @@
lickhous
lickhous
lickhous
lickhous
lickhous
lickhous
lickhous
lickhous

View File

@ -0,0 +1,8 @@
SELECT substr('clickhouse', 2, -2);
SELECT substr(materialize('clickhouse'), 2, -2);
SELECT substr('clickhouse', materialize(2), -2);
SELECT substr(materialize('clickhouse'), materialize(2), -2);
SELECT substr('clickhouse', 2, materialize(-2));
SELECT substr(materialize('clickhouse'), 2, materialize(-2));
SELECT substr('clickhouse', materialize(2), materialize(-2));
SELECT substr(materialize('clickhouse'), materialize(2), materialize(-2));

View File

@ -1,6 +1,6 @@
SET max_bytes_before_external_group_by = 200000000;
SET max_memory_usage = 1000000000;
SET max_memory_usage = 1500000000;
SET max_threads = 12;
SELECT URL, uniq(SearchPhrase) AS u FROM test.hits GROUP BY URL ORDER BY u DESC, URL LIMIT 10;

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (19.17.1.1) unstable; urgency=low
clickhouse (19.18.1.1) unstable; urgency=low
* Modified source code
-- clickhouse-release <clickhouse-release@yandex-team.ru> Mon, 21 Oct 2019 15:47:56 +0300
-- clickhouse-release <clickhouse-release@yandex-team.ru> Fri, 08 Nov 2019 11:36:37 +0300

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=19.17.1.*
ARG version=19.18.1.*
RUN apt-get update \
&& apt-get install --yes --no-install-recommends \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=19.17.1.*
ARG version=19.18.1.*
ARG gosu_ver=1.10
RUN apt-get update \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=19.17.1.*
ARG version=19.18.1.*
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -27,7 +27,7 @@
- [HTTP-ClickHouse](https://metacpan.org/release/HTTP-ClickHouse)
- [AnyEvent-ClickHouse](https://metacpan.org/release/AnyEvent-ClickHouse)
- Ruby
- [clickhouse (Ruby)](https://github.com/archan937/clickhouse)
- [ClickHouse (Ruby)](https://github.com/shlima/click_house)
- R
- [clickhouse-r](https://github.com/hannesmuehleisen/clickhouse-r)
- [RClickhouse](https://github.com/IMSMWU/RClickhouse)

View File

@ -338,6 +338,7 @@ Columns:
- `table` (`String`) Name of the table.
- `engine` (`String`) Name of the table engine without parameters.
- `path` (`String`) Absolute path to the folder with data part files.
- `disk` (`String`) Name of a disk that stores the data part.
- `hash_of_all_files` (`String`) [sipHash128](../query_language/functions/hash_functions.md#hash_functions-siphash128) of compressed files.
- `hash_of_uncompressed_files` (`String`) [sipHash128](../query_language/functions/hash_functions.md#hash_functions-siphash128) of uncompressed files (files with marks, index file etc.).
- `uncompressed_hash_of_compressed_files` (`String`) [sipHash128](../query_language/functions/hash_functions.md#hash_functions-siphash128) of data in the compressed files as if they were uncompressed.
@ -354,11 +355,12 @@ This table contains information about events that occurred with [data parts](tab
The `system.part_log` table contains the following columns:
- `event_type` (Enum) — Type of the event that occurred with the data part. Can have one of the following values:
- `NEW_PART` — inserting
- `MERGE_PARTS` — merging
- `DOWNLOAD_PART` — downloading
- `REMOVE_PART` — removing or detaching using [DETACH PARTITION](../query_language/alter.md#alter_detach-partition)
- `MUTATE_PART` — updating.
- `NEW_PART` — Inserting of a new data part.
- `MERGE_PARTS` — Merging of data parts.
- `DOWNLOAD_PART` — Downloading a data part.
- `REMOVE_PART` — Removing or detaching a data part using [DETACH PARTITION](../query_language/alter.md#alter_detach-partition).
- `MUTATE_PART` — Mutating of a data part.
- `MOVE_PART` — Moving the data part from the one disk to another one.
- `event_date` (Date) — Event date.
- `event_time` (DateTime) — Event time.
- `duration_ms` (UInt64) — Duration.
@ -761,6 +763,30 @@ If there were problems with mutating some parts, the following columns contain a
## system.disks {#system_tables-disks}
Contains information about disks defined in the [server configuration](table_engines/mergetree.md#table_engine-mergetree-multiple-volumes_configure).
Columns:
- `name` ([String](../data_types/string.md)) — Name of a disk in the server configuration.
- `path` ([String](../data_types/string.md)) — Path to the mount point in the file system.
- `free_space` ([UInt64](../data_types/int_uint.md)) — Free space on disk in bytes.
- `total_space` ([UInt64](../data_types/int_uint.md)) — Disk volume in bytes.
- `keep_free_space` ([UInt64](../data_types/int_uint.md)) — Amount of disk space that should stay free on disk in bytes. Defined in the `keep_free_space_bytes` parameter of disk configuration.
## system.storage_policies {#system_tables-storage_policies}
Contains information about storage policies and volumes defined in the [server configuration](table_engines/mergetree.md#table_engine-mergetree-multiple-volumes_configure).
Columns:
- `policy_name` ([String](../data_types/string.md)) — Name of the storage policy.
- `volume_name` ([String](../data_types/string.md)) — Volume name defined in the storage policy.
- `volume_priority` ([UInt64](../data_types/int_uint.md)) — Volume order number in the configuration.
- `disks` ([Array(String)](../data_types/array.md)) — Disk names, defined in the storage policy.
- `max_data_part_size` ([UInt64](../data_types/int_uint.md)) — Maximum size of a data part that can be stored on volume disks (0 — no limit).
- `move_factor` ([Float64](../data_types/float.md)) — Ratio of free disk space. When the ratio exceeds the value of configuration parameter, ClickHouse start to move data to the next volume in order.
If the storage policy contains more then one volume, then information for each volume is stored in the individual row of the table.
[Original article](https://clickhouse.yandex/docs/en/operations/system_tables/) <!--hide-->

View File

@ -121,5 +121,16 @@ If the server ceased to exist or had a rough restart (for example, after a devic
When the max_parallel_replicas option is enabled, query processing is parallelized across all replicas within a single shard. For more information, see the section [max_parallel_replicas](../settings/settings.md#settings-max_parallel_replicas).
## Virtual Columns
- `_shard_num` — Contains the `shard_num` (from `system.clusters`). Type: [UInt32](../../data_types/int_uint.md).
!!! note "Note"
Since [`remote`](../../query_language/table_functions/remote.md)/`cluster` table functions internally create temporary instance of the same Distributed engine, `_shard_num` is available there too.
**See Also**
- [Virtual columns](index.md#table_engines-virtual_columns)
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/distributed/) <!--hide-->

View File

@ -85,7 +85,9 @@ For a description of parameters, see the [CREATE query description](../../query_
- `min_merge_bytes_to_use_direct_io` — The minimum data volume for merge operation that is required for using direct I/O access to the storage disk. When merging data parts, ClickHouse calculates the total storage volume of all the data to be merged. If the volume exceeds `min_merge_bytes_to_use_direct_io` bytes, ClickHouse reads and writes the data to the storage disk using the direct I/O interface (`O_DIRECT` option). If `min_merge_bytes_to_use_direct_io = 0`, then direct I/O is disabled. Default value: `10 * 1024 * 1024 * 1024` bytes.
<a name="mergetree_setting-merge_with_ttl_timeout"></a>
- `merge_with_ttl_timeout` — Minimum delay in seconds before repeating a merge with TTL. Default value: 86400 (1 day).
- `write_final_mark` — Enables or disables writing the final index mark at the end of the data part. Default value: 1. Don't turn it off.
- `write_final_mark` — Enables or disables writing the final index mark at the end of data part. Default value: 1. Don't turn it off.
- `storage_policy` — Storage policy. See [Using Multiple Block Devices for Data Storage](#table_engine-mergetree-multiple-volumes).
**Example of Sections Setting**
@ -462,53 +464,89 @@ If you perform the `SELECT` query between merges, you may get expired data. To a
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/mergetree/) <!--hide-->
## Using multiple block devices for data storage {#table_engine-mergetree-multiple-volumes}
## Using Multiple Block Devices for Data Storage {#table_engine-mergetree-multiple-volumes}
### General
### Introduction
Tables of the MergeTree family are able to store their data on multiple block devices, which may be useful when, for instance, the data of a certain table are implicitly split into "hot" and "cold". The most recent data is regularly requested but requires only a small amount of space. On the contrary, the fat-tailed historical data is requested rarely. If several disks are available, the "hot" data may be located on fast disks (NVMe SSDs or even in memory), while the "cold" data - on relatively slow ones (HDD).
`MergeTree` family table engines can store data on multiple block devices. For example, it can be useful when the data of a certain table are implicitly split into "hot" and "cold". The most recent data is regularly requested but requires only a small amount of space. On the contrary, the fat-tailed historical data is requested rarely. If several disks are available, the "hot" data may be located on fast disks (for example, NVMe SSDs or in memory), while the "cold" data - on relatively slow ones (for example, HDD).
Part is the minimum movable unit for MergeTree tables. The data belonging to one part are stored on one disk. Parts can be moved between disks in the background (according to user settings) as well as by means of the [ALTER](../../query_language/alter.md#alter_move-partition) queries.
Data part is the minimum movable unit for `MergeTree`-engine tables. The data belonging to one part are stored on one disk. Data parts can be moved between disks in the background (according to user settings) as well as by means of the [ALTER](../../query_language/alter.md#alter_move-partition) queries.
### Terms
* Disk — a block device mounted to the filesystem.
* Default disk — a disk that contains the path specified in the `<path>` tag in `config.xml`.
* Volume — an ordered set of equal disks (similar to [JBOD](https://en.wikipedia.org/wiki/Non-RAID_drive_architectures)).
* Storage policy — a number of volumes together with the rules for moving data between them.
The names given to the described entities can be found in the system tables, [system.storage_policies](../system_tables.md#system_tables-storage_policies) and [system.disks](../system_tables.md#system_tables-disks). Storage policy name can be used as a parameter for tables of the MergeTree family.
- Disk — Block device mounted to the filesystem.
- Default disk — Disk that stores the path specified in the [path](../server_settings/settings.md#server_settings-path) server setting.
- Volume — Ordered set of equal disks (similar to [JBOD](https://en.wikipedia.org/wiki/Non-RAID_drive_architectures)).
- Storage policy — Set of volumes and the rules for moving data between them.
The names given to the described entities can be found in the system tables, [system.storage_policies](../system_tables.md#system_tables-storage_policies) and [system.disks](../system_tables.md#system_tables-disks). To apply one of the configured storage policies for a table, use the `storage_policy` setting of `MergeTree`-engine family tables.
### Configuration {#table_engine-mergetree-multiple-volumes_configure}
Disks, volumes and storage policies should be declared inside the `<storage_configuration>` tag either in the main file `config.xml` or in a distinct file in the `config.d` directory. This section in a configuration file has the following structure:
Disks, volumes and storage policies should be declared inside the `<storage_configuration>` tag either in the main file `config.xml` or in a distinct file in the `config.d` directory.
Configuration structure:
```xml
<disks>
<fast_disk> <!-- disk name -->
<disk_name_1> <!-- disk name -->
<path>/mnt/fast_ssd/clickhouse</path>
</fast_disk>
<disk1>
</disk_name_1>
<disk_name_2>
<path>/mnt/hdd1/clickhouse</path>
<keep_free_space_bytes>10485760</keep_free_space_bytes>_
</disk1>
<disk2>
</disk_name_2>
<disk_name_3>
<path>/mnt/hdd2/clickhouse</path>
<keep_free_space_bytes>10485760</keep_free_space_bytes>_
</disk2>
</disk_name_3>
...
</disks>
```
where
Tags:
* the disk name is given as a tag name.
* `path` — path under which a server will store data (`data` and `shadow` folders), should be terminated with '/'.
* `keep_free_space_bytes` — the amount of free disk space to be reserved.
- `<disk_name_N>` — Disk name. Names must be different for all disks.
- `path` — path under which a server will store data (`data` and `shadow` folders), should be terminated with '/'.
- `keep_free_space_bytes` — the amount of free disk space to be reserved.
The order of the disk definition is not important.
Storage policies configuration:
Storage policies configuration markup:
```xml
<policies>
<policy_name_1>
<volumes>
<volume_name_1>
<disk>disk_name_from_disks_configuration</disk>
<max_data_part_size_bytes>1073741824</max_data_part_size_bytes>
</volume_name_1>
<volume_name_2>
<!-- configuration -->
</volume_name_2>
<!-- more volumes -->
</volumes>
<move_factor>0.2</move_factor>
</policy_name_1>
<policy_name_2>
<!-- configuration -->
</policy_name_2>
<!-- more policies -->
</policies>
```
Tags:
- `policy_name_N` — Policy name. Policy names must be unique.
- `volume_name_N` — Volume name. Volume names must be unique.
- `disk` — a disk within a volume.
- `max_data_part_size_bytes` — the maximum size of a part that can be stored on any of the volume's disks.
- `move_factor` — when the amount of available space gets lower than this factor, data automatically start to move on the next volume if any (by default, 0.1).
Cofiguration examples:
```xml
<policies>
@ -536,16 +574,9 @@ Storage policies configuration:
</policies>
```
where
In given example, the `hdd_in_order` policy implements the [round-robin](https://en.wikipedia.org/wiki/Round-robin_scheduling) approach. Thus this policy defines only one volume (`single`), the data parts are stored on all its disks in circular order. Such policy can be quite useful if there are several similar disks are mounted to the system, but RAID is not configured. Keep in mind that each individual disk drive is not reliable and you might want to compensate it with replication factor of 3 or more.
* volume and storage policy names are given as tag names.
* `disk` — a disk within a volume.
* `max_data_part_size_bytes` — the maximum size of a part that can be stored on any of the volume's disks.
* `move_factor` — when the amount of available space gets lower than this factor, data automatically start to move on the next volume if any (by default, 0.1).
In the given example, the `hdd_in_order` policy implements the [round-robin](https://en.wikipedia.org/wiki/Round-robin_scheduling) approach. Since the policy defines only one volume (`single`), the data are stored on all its disks in circular order. Such a policy can be quite useful if there are several similar disks mounted to the system. If there are different disks, the policy `moving_from_ssd_to_hdd` can be used instead.
The volume `hot` consists of an SSD disk (`fast_ssd`), and the maximum size of a part that can be stored on this volume is 1GB. All the parts with the size larger than 1GB will be stored directly on the `cold` volume, which contains an HDD disk `disk1`.
If there are different kinds of disks available in the system, `moving_from_ssd_to_hdd` policy can be used instead. The volume `hot` consists of an SSD disk (`fast_ssd`), and the maximum size of a part that can be stored on this volume is 1GB. All the parts with the size larger than 1GB will be stored directly on the `cold` volume, which contains an HDD disk `disk1`.
Also, once the disk `fast_ssd` gets filled by more than 80%, data will be transferred to the `disk1` by a background process.
The order of volume enumeration within a storage policy is important. Once a volume is overfilled, data are moved to the next one. The order of disk enumeration is important as well because data are stored on them in turns.
@ -568,12 +599,12 @@ The `default` storage policy implies using only one volume, which consists of on
### Details
In the case of MergeTree tables, data is getting to disk in different ways:
In the case of `MergeTree` tables, data is getting to disk in different ways:
* as a result of an insert (`INSERT` query).
* during background merges and [mutations](../../query_language/alter.md#alter-mutations).
* when downloading from another replica.
* as a result of partition freezing [ALTER TABLE ... FREEZE PARTITION](../../query_language/alter.md#alter_freeze-partition).
- As a result of an insert (`INSERT` query).
- During background merges and [mutations](../../query_language/alter.md#alter-mutations).
- When downloading from another replica.
- As a result of partition freezing [ALTER TABLE ... FREEZE PARTITION](../../query_language/alter.md#alter_freeze-partition).
In all these cases except for mutations and partition freezing, a part is stored on a volume and a disk according to the given storage policy:
@ -592,3 +623,4 @@ Moving data does not interfere with data replication. Therefore, different stora
After the completion of background merges and mutations, old parts are removed only after a certain amount of time (`old_parts_lifetime`).
During this time, they are not moved to other volumes or disks. Therefore, until the parts are finally removed, they are still taken into account for evaluation of the occupied disk space.
[Original article](https://clickhouse.yandex/docs/ru/operations/table_engines/mergetree/) <!--hide-->

Some files were not shown because too many files have changed in this diff Show More