Merge branch 'master' of https://github.com/yandex/ClickHouse into CLICKHOUSE-2720

This commit is contained in:
Ivan Blinkov 2018-07-09 11:55:04 +03:00
commit cbf6ce6c9d
96 changed files with 1188 additions and 671 deletions

View File

@ -1,4 +1,15 @@
# en:
## Improvements:
* Added Nullable support for runningDifference function. [#2590](https://github.com/yandex/ClickHouse/issues/2590)
## Bug fiexs:
* Fixed switching to default databses in case of client reconection. [#2580](https://github.com/yandex/ClickHouse/issues/2580)
# ru:
## Улучшения:
* Добавлена поддержка Nullable для функции runningDifference. [#2590](https://github.com/yandex/ClickHouse/issues/2590)
## Исправление ошибок:
* Исправлено переключение на дефолтную базу данных при переподключении клиента. [#2580](https://github.com/yandex/ClickHouse/issues/2580)

View File

@ -1,7 +1,7 @@
# This strings autochanged from release_lib.sh:
set(VERSION_DESCRIBE v1.1.54388-testing)
set(VERSION_REVISION 54388)
set(VERSION_GITHASH 2447755700f40af317cb80ba8800b94d6350d148)
set(VERSION_DESCRIBE v1.1.54390-testing)
set(VERSION_REVISION 54390)
set(VERSION_GITHASH 75651605338a0a4688acedd94e96650ee981789f)
# end of autochange
set (VERSION_MAJOR 1)

View File

@ -367,10 +367,20 @@ void read(String & s, ReadBuffer & in)
static constexpr int32_t max_string_size = 1 << 20;
int32_t size = 0;
read(size, in);
if (size < 0) /// TODO Actually it means that zookeeper node has NULL value. Maybe better to treat it like empty string.
if (size == -1)
{
/// It means that zookeeper node has NULL value. We will treat it like empty string.
s.clear();
return;
}
if (size < 0)
throw Exception("Negative size while reading string from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
if (size > max_string_size)
throw Exception("Too large string size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
s.resize(size);
in.read(&s[0], size);
}

View File

@ -19,13 +19,6 @@
/// The size of the I/O buffer by default.
#define DBMS_DEFAULT_BUFFER_SIZE 1048576ULL
/// When writing data, a buffer of `max_compress_block_size` size is allocated for compression. When the buffer overflows or if into the buffer
/// more or equal data is written than `min_compress_block_size`, then with the next mark, the data will also compressed
/// As a result, for small columns (numbers 1-8 bytes), with index_granularity = 8192, the block size will be 64 KB.
/// And for large columns (Title - string ~100 bytes), the block size will be ~819 KB. Due to this, the compression ratio almost does not get worse.
#define DEFAULT_MIN_COMPRESS_BLOCK_SIZE 65536
#define DEFAULT_MAX_COMPRESS_BLOCK_SIZE 1048576
/** Which blocks by default read the data (by number of rows).
* Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query.
*/
@ -43,17 +36,12 @@
*/
#define DEFAULT_MERGE_BLOCK_SIZE 8192
#define DEFAULT_MAX_QUERY_SIZE 262144
#define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160)
#define DEFAULT_MAX_DISTRIBUTED_CONNECTIONS 1024
#define DEFAULT_INTERACTIVE_DELAY 100000
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
/// each period reduces the error counter by 2 times
/// too short a period can cause errors to disappear immediately after creation.
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD (2 * DBMS_DEFAULT_SEND_TIMEOUT_SEC)
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
@ -65,8 +53,6 @@
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226
#define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

View File

@ -1,20 +0,0 @@
#include <sstream>
#include <Core/SortDescription.h>
#include <Columns/Collator.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
std::string SortColumnDescription::getID() const
{
WriteBufferFromOwnString out;
out << column_name << ", " << column_number << ", " << direction << ", " << nulls_direction;
if (collator)
out << ", collation locale: " << collator->getLocale();
return out.str();
}
}

View File

@ -26,9 +26,6 @@ struct SortColumnDescription
SortColumnDescription(const std::string & column_name_, int direction_, int nulls_direction_, const std::shared_ptr<Collator> & collator_ = nullptr)
: column_name(column_name_), column_number(0), direction(direction_), nulls_direction(nulls_direction_), collator(collator_) {}
/// For IBlockInputStream.
std::string getID() const;
};
/// Description of the sorting rule for several columns.

View File

@ -119,7 +119,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (!done_with_set)
{
if (!subquery.set->insertFromBlock(block, /*fill_set_elements=*/false))
if (!subquery.set->insertFromBlock(block))
done_with_set = true;
}

View File

@ -3,6 +3,8 @@
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnTuple.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/FieldVisitors.h>
@ -74,7 +76,8 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
}
else
{
if (!column.type->isSummable())
bool is_agg_func = checkDataType<DataTypeAggregateFunction>(column.type.get());
if (!column.type->isSummable() && !is_agg_func)
{
column_numbers_not_to_aggregate.push_back(i);
continue;
@ -93,8 +96,14 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
{
// Create aggregator to sum this column
AggregateDescription desc;
desc.is_agg_func_type = is_agg_func;
desc.column_numbers = {i};
desc.init("sumWithOverflow", {column.type});
if (!is_agg_func)
{
desc.init("sumWithOverflow", {column.type});
}
columns_to_aggregate.emplace_back(std::move(desc));
}
else
@ -193,27 +202,34 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me
// Do not insert if the aggregation state hasn't been created
if (desc.created)
{
try
if (desc.is_agg_func_type)
{
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
/// Update zero status of current row
if (desc.column_numbers.size() == 1)
{
// Flag row as non-empty if at least one column number if non-zero
current_row_is_zero = current_row_is_zero && desc.merged_column->get64(desc.merged_column->size() - 1) == 0;
}
else
{
/// It is sumMap aggregate function.
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
current_row_is_zero = false;
}
current_row_is_zero = false;
}
catch (...)
else
{
desc.destroyState();
throw;
try
{
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
/// Update zero status of current row
if (desc.column_numbers.size() == 1)
{
// Flag row as non-empty if at least one column number if non-zero
current_row_is_zero = current_row_is_zero && desc.merged_column->get64(desc.merged_column->size() - 1) == 0;
}
else
{
/// It is sumMap aggregate function.
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
current_row_is_zero = false;
}
}
catch (...)
{
desc.destroyState();
throw;
}
}
desc.destroyState();
}
@ -258,7 +274,7 @@ Block SummingSortedBlockInputStream::readImpl()
for (auto & desc : columns_to_aggregate)
{
// Wrap aggregated columns in a tuple to match function signature
if (checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
if (!desc.is_agg_func_type && checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
{
size_t tuple_size = desc.column_numbers.size();
MutableColumns tuple_columns(tuple_size);
@ -277,7 +293,7 @@ Block SummingSortedBlockInputStream::readImpl()
/// Place aggregation results into block.
for (auto & desc : columns_to_aggregate)
{
if (checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
if (!desc.is_agg_func_type && checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
{
/// Unpack tuple into block.
size_t tuple_size = desc.column_numbers.size();
@ -465,23 +481,32 @@ void SummingSortedBlockInputStream::addRow(SortCursor & cursor)
{
for (auto & desc : columns_to_aggregate)
{
if (!desc.created)
throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR);
// Specialized case for unary functions
if (desc.column_numbers.size() == 1)
if (desc.is_agg_func_type)
{
// desc.state is not used for AggregateFunction types
auto & col = cursor->all_columns[desc.column_numbers[0]];
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr);
static_cast<ColumnAggregateFunction &>(*desc.merged_column).insertMergeFrom(*col, cursor->pos);
}
else
{
// Gather all source columns into a vector
ColumnRawPtrs columns(desc.column_numbers.size());
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
columns[i] = cursor->all_columns[desc.column_numbers[i]];
if (!desc.created)
throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR);
desc.add_function(desc.function.get(), desc.state.data(), columns.data(), cursor->pos, nullptr);
// Specialized case for unary functions
if (desc.column_numbers.size() == 1)
{
auto & col = cursor->all_columns[desc.column_numbers[0]];
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr);
}
else
{
// Gather all source columns into a vector
ColumnRawPtrs columns(desc.column_numbers.size());
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
columns[i] = cursor->all_columns[desc.column_numbers[i]];
desc.add_function(desc.function.get(), desc.state.data(), columns.data(), cursor->pos, nullptr);
}
}
}
}

View File

@ -69,6 +69,7 @@ private:
/// Stores aggregation function, state, and columns to be used as function arguments
struct AggregateDescription
{
/// An aggregate function 'sumWithOverflow' or 'sumMap' for summing.
AggregateFunctionPtr function;
IAggregateFunction::AddFunc add_function = nullptr;
std::vector<size_t> column_numbers;
@ -76,6 +77,9 @@ private:
std::vector<char> state;
bool created = false;
/// In case when column has type AggregateFunction: use the aggregate function from itself instead of 'function' above.
bool is_agg_func_type = false;
void init(const char * function_name, const DataTypes & argument_types)
{
function = AggregateFunctionFactory::instance().get(function_name, argument_types);
@ -87,7 +91,10 @@ private:
{
if (created)
return;
function->create(state.data());
if (is_agg_func_type)
merged_column->insertDefault();
else
function->create(state.data());
created = true;
}
@ -95,7 +102,8 @@ private:
{
if (!created)
return;
function->destroy(state.data());
if (!is_agg_func_type)
function->destroy(state.data());
created = false;
}

View File

@ -241,7 +241,7 @@ void DataTypeAggregateFunction::serializeTextCSV(const IColumn & column, size_t
void DataTypeAggregateFunction::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String s;
readCSV(s, istr, settings.csv.delimiter);
readCSV(s, istr, settings.csv);
deserializeFromString(function, column, s);
}

View File

@ -415,7 +415,7 @@ void DataTypeArray::serializeTextCSV(const IColumn & column, size_t row_num, Wri
void DataTypeArray::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String s;
readCSV(s, istr, settings.csv.delimiter);
readCSV(s, istr, settings.csv);
ReadBufferFromString rb(s);
deserializeText(column, rb, settings);
}

View File

@ -194,7 +194,7 @@ template <typename Type>
void DataTypeEnum<Type>::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
std::string name;
readCSVString(name, istr, settings.csv.delimiter);
readCSVString(name, istr, settings.csv);
static_cast<ColumnType &>(column).getData().push_back(getValue(StringRef(name)));
}

View File

@ -197,7 +197,7 @@ void DataTypeFixedString::serializeTextCSV(const IColumn & column, size_t row_nu
void DataTypeFixedString::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
read(*this, column, [&istr, delimiter = settings.csv.delimiter](ColumnFixedString::Chars_t & data) { readCSVStringInto(data, istr, delimiter); });
read(*this, column, [&istr, &csv = settings.csv](ColumnFixedString::Chars_t & data) { readCSVStringInto(data, istr, csv); });
}

View File

@ -245,6 +245,12 @@ bool DataTypeNumberBase<T>::isValueRepresentedByInteger() const
return std::is_integral_v<T>;
}
template <typename T>
bool DataTypeNumberBase<T>::isValueRepresentedByUnsignedInteger() const
{
return std::is_integral_v<T> && std::is_unsigned_v<T>;
}
/// Explicit template instantiations - to avoid code bloat in headers.
template class DataTypeNumberBase<UInt8>;

View File

@ -46,6 +46,7 @@ public:
bool isComparable() const override { return true; }
bool isValueRepresentedByNumber() const override { return true; }
bool isValueRepresentedByInteger() const override;
bool isValueRepresentedByUnsignedInteger() const override;
bool isValueUnambiguouslyRepresentedInContiguousMemoryRegion() const override { return true; }
bool haveMaximumSizeOfValue() const override { return true; }
size_t getSizeOfValueInMemory() const override { return sizeof(T); }

View File

@ -288,7 +288,7 @@ void DataTypeString::serializeTextCSV(const IColumn & column, size_t row_num, Wr
void DataTypeString::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
read(column, [&](ColumnString::Chars_t & data) { readCSVStringInto(data, istr, settings.csv.delimiter); });
read(column, [&](ColumnString::Chars_t & data) { readCSVStringInto(data, istr, settings.csv); });
}

View File

@ -307,6 +307,10 @@ public:
*/
virtual bool isValueRepresentedByInteger() const { return false; }
/** Unsigned Integers, Date, DateTime. Not nullable.
*/
virtual bool isValueRepresentedByUnsignedInteger() const { return false; }
/** Values are unambiguously identified by contents of contiguous memory region,
* that can be obtained by IColumn::getDataAt method.
* Examples: numbers, Date, DateTime, String, FixedString,

View File

@ -83,16 +83,16 @@ static inline void skipWhitespacesAndTabs(ReadBuffer & buf)
}
static void skipRow(ReadBuffer & istr, const char delimiter, size_t num_columns)
static void skipRow(ReadBuffer & istr, const FormatSettings::CSV & settings, size_t num_columns)
{
String tmp;
for (size_t i = 0; i < num_columns; ++i)
{
skipWhitespacesAndTabs(istr);
readCSVString(tmp, istr, delimiter);
readCSVString(tmp, istr, settings);
skipWhitespacesAndTabs(istr);
skipDelimiter(istr, delimiter, i + 1 == num_columns);
skipDelimiter(istr, settings.delimiter, i + 1 == num_columns);
}
}
@ -107,7 +107,7 @@ void CSVRowInputStream::readPrefix()
String tmp;
if (with_names)
skipRow(istr, format_settings.csv.delimiter, num_columns);
skipRow(istr, format_settings.csv, num_columns);
}

View File

@ -37,6 +37,8 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
FormatSettings format_settings;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.date_time_input_format = settings.date_time_input_format;
@ -59,6 +61,8 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer &
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.write_statistics = settings.output_format_write_statistics;

View File

@ -24,6 +24,8 @@ struct FormatSettings
struct CSV
{
char delimiter = ',';
bool allow_single_quotes = true;
bool allow_double_quotes = true;
};
CSV csv;

View File

@ -204,7 +204,7 @@ private:
static bool hasNull(const PaddedPODArray<UInt8> & null_map, size_t i)
{
return null_map[i] == 1;
return null_map[i];
}
/// Both function arguments are ordinary.
@ -287,7 +287,7 @@ private:
for (size_t j = 0; j < array_size; ++j)
{
if (null_map_data[current_offset + j] == 1)
if (null_map_data[current_offset + j])
{
}
else if (compare(data[current_offset + j], value, i))
@ -324,7 +324,7 @@ private:
for (size_t j = 0; j < array_size; ++j)
{
bool hit = false;
if (null_map_data[current_offset + j] == 1)
if (null_map_data[current_offset + j])
{
if (hasNull(null_map_item, i))
hit = true;
@ -394,11 +394,6 @@ struct ArrayIndexNumNullImpl
size_t size = offsets.size();
result.resize(size);
if (!null_map_data)
return;
const auto & null_map_ref = *null_map_data;
ColumnArray::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
@ -407,7 +402,7 @@ struct ArrayIndexNumNullImpl
for (size_t j = 0; j < array_size; ++j)
{
if (null_map_ref[current_offset + j] == 1)
if (null_map_data && (*null_map_data)[current_offset + j])
{
if (!IndexConv::apply(j, current))
break;
@ -433,11 +428,6 @@ struct ArrayIndexStringNullImpl
const auto size = offsets.size();
result.resize(size);
if (!null_map_data)
return;
const auto & null_map_ref = *null_map_data;
ColumnArray::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
@ -446,8 +436,7 @@ struct ArrayIndexStringNullImpl
for (size_t j = 0; j < array_size; ++j)
{
size_t k = (current_offset == 0 && j == 0) ? 0 : current_offset + j - 1;
if (null_map_ref[k] == 1)
if (null_map_data && (*null_map_data)[current_offset + j])
{
if (!IndexConv::apply(j, current))
break;
@ -487,8 +476,7 @@ struct ArrayIndexStringImpl
ColumnArray::Offset string_size = string_offsets[current_offset + j] - string_pos;
size_t k = (current_offset == 0 && j == 0) ? 0 : current_offset + j - 1;
if (null_map_data && ((*null_map_data)[k] == 1))
if (null_map_data && (*null_map_data)[current_offset + j])
{
}
else if (string_size == value_size + 1 && 0 == memcmp(value.data(), &data[string_pos], value_size))
@ -524,21 +512,20 @@ struct ArrayIndexStringImpl
for (size_t j = 0; j < array_size; ++j)
{
ColumnArray::Offset string_pos = current_offset == 0 && j == 0
? 0
: string_offsets[current_offset + j - 1];
? 0
: string_offsets[current_offset + j - 1];
ColumnArray::Offset string_size = string_offsets[current_offset + j] - string_pos;
bool hit = false;
size_t k = (current_offset == 0 && j == 0) ? 0 : current_offset + j - 1;
if (null_map_data && ((*null_map_data)[k] == 1))
if (null_map_data && (*null_map_data)[current_offset + j])
{
if (null_map_item && ((*null_map_item)[i] == 1))
if (null_map_item && (*null_map_item)[i])
hit = true;
}
else if (string_size == value_size && 0 == memcmp(&item_values[value_pos], &data[string_pos], value_size))
hit = true;
hit = true;
if (hit)
{
@ -638,7 +625,7 @@ private:
for (size_t j = 0; j < array_size; ++j)
{
if (null_map_data[current_offset + j] == 1)
if (null_map_data[current_offset + j])
{
}
else if (0 == data.compareAt(current_offset + j, is_value_has_single_element_to_compare ? 0 : i, value, 1))
@ -674,9 +661,9 @@ private:
for (size_t j = 0; j < array_size; ++j)
{
bool hit = false;
if (null_map_data[current_offset + j] == 1)
if (null_map_data[current_offset + j])
{
if (null_map_item[i] == 1)
if (null_map_item[i])
hit = true;
}
else if (0 == data.compareAt(current_offset + j, is_value_has_single_element_to_compare ? 0 : i, value, 1))
@ -724,11 +711,6 @@ struct ArrayIndexGenericNullImpl
size_t size = offsets.size();
result.resize(size);
if (!null_map_data)
return;
const auto & null_map_ref = *null_map_data;
ColumnArray::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
@ -737,7 +719,7 @@ struct ArrayIndexGenericNullImpl
for (size_t j = 0; j < array_size; ++j)
{
if (null_map_ref[current_offset + j] == 1)
if (null_map_data && (*null_map_data)[current_offset + j])
{
if (!IndexConv::apply(j, current))
break;
@ -931,7 +913,7 @@ private:
if (arr[i].isNull())
{
if (null_map && ((*null_map)[row] == 1))
if (null_map && (*null_map)[row])
hit = true;
}
else if (applyVisitor(FieldVisitorAccurateEquals(), arr[i], value))

View File

@ -993,9 +993,17 @@ struct ToIntMonotonicity
{
size_t size_of_type = type.getSizeOfValueInMemory();
/// If type is expanding, then function is monotonic.
/// If type is expanding
if (sizeof(T) > size_of_type)
return { true, true, true };
{
/// If convert signed -> signed or unsigned -> signed, then function is monotonic.
if (std::is_signed_v<T> || type.isValueRepresentedByUnsignedInteger())
return {true, true, true};
/// If arguments from the same half, then function is monotonic.
if ((left.get<Int64>() >= 0) == (right.get<Int64>() >= 0))
return {true, true, true};
}
/// If type is same, too. (Enum has separate case, because it is different data type)
if (checkDataType<DataTypeNumber<T>>(&type) ||

View File

@ -33,6 +33,7 @@
#include <Storages/IStorage.h>
#include <Common/typeid_cast.h>
#include <Storages/getStructureOfRemoteTable.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
@ -1468,7 +1469,7 @@ private:
/// It is possible to track value from previous block, to calculate continuously across all blocks. Not implemented.
template <typename Src, typename Dst>
static void process(const PaddedPODArray<Src> & src, PaddedPODArray<Dst> & dst)
static void process(const PaddedPODArray<Src> & src, PaddedPODArray<Dst> & dst, const NullMap * null_map)
{
size_t size = src.size();
dst.resize(size);
@ -1478,13 +1479,26 @@ private:
/// It is possible to SIMD optimize this loop. By no need for that in practice.
dst[0] = is_first_line_zero ? 0 : src[0];
Src prev = src[0];
for (size_t i = 1; i < size; ++i)
Src prev;
bool has_prev_value = false;
for (size_t i = 0; i < size; ++i)
{
auto cur = src[i];
dst[i] = static_cast<Dst>(cur) - prev;
prev = cur;
if (null_map && (*null_map)[i])
continue;
if (!has_prev_value)
{
dst[i] = is_first_line_zero ? 0 : src[i];
prev = src[i];
has_prev_value = true;
}
else
{
auto cur = src[i];
dst[i] = static_cast<Dst>(cur) - prev;
prev = cur;
}
}
}
@ -1547,14 +1561,19 @@ public:
return false;
}
bool useDefaultImplementationForNulls() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
DataTypePtr res;
dispatchForSourceType(*arguments[0], [&](auto field_type_tag)
dispatchForSourceType(*removeNullable(arguments[0]), [&](auto field_type_tag)
{
res = std::make_shared<DataTypeNumber<DstFieldType<decltype(field_type_tag)>>>();
});
if (arguments[0]->isNullable())
res = makeNullable(res);
return res;
}
@ -1570,16 +1589,29 @@ public:
return;
}
auto res_column = res_type->createColumn();
auto res_column = removeNullable(res_type)->createColumn();
auto * src_column = src.column.get();
ColumnPtr null_map_column = nullptr;
const NullMap * null_map = nullptr;
if (auto * nullable_column = checkAndGetColumn<ColumnNullable>(src_column))
{
src_column = &nullable_column->getNestedColumn();
null_map_column = nullable_column->getNullMapColumnPtr();
null_map = &nullable_column->getNullMapData();
}
dispatchForSourceType(*src.type, [&](auto field_type_tag)
dispatchForSourceType(*removeNullable(src.type), [&](auto field_type_tag)
{
using SrcFieldType = decltype(field_type_tag);
process(static_cast<const ColumnVector<SrcFieldType> &>(*src.column).getData(),
static_cast<ColumnVector<DstFieldType<SrcFieldType>> &>(*res_column).getData());
process(static_cast<const ColumnVector<SrcFieldType> &>(*src_column).getData(),
static_cast<ColumnVector<DstFieldType<SrcFieldType>> &>(*res_column).getData(), null_map);
});
block.getByPosition(result).column = std::move(res_column);
if (null_map_column)
block.getByPosition(result).column = ColumnNullable::create(std::move(res_column), null_map_column);
else
block.getByPosition(result).column = std::move(res_column);
}
};

View File

@ -2,6 +2,7 @@
#include <Common/hex.h>
#include <Common/PODArray.h>
#include <Common/StringUtils/StringUtils.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/readFloatText.h>
@ -500,18 +501,19 @@ void readBackQuotedStringWithSQLStyle(String & s, ReadBuffer & buf)
template <typename Vector>
void readCSVStringInto(Vector & s, ReadBuffer & buf, const char delimiter)
void readCSVStringInto(Vector & s, ReadBuffer & buf, const FormatSettings::CSV & settings)
{
if (buf.eof())
throwReadAfterEOF();
char maybe_quote = *buf.position();
const char delimiter = settings.delimiter;
const char maybe_quote = *buf.position();
/// Emptiness and not even in quotation marks.
if (maybe_quote == delimiter)
return;
if (maybe_quote == '\'' || maybe_quote == '"')
if ((settings.allow_single_quotes && maybe_quote == '\'') || (settings.allow_double_quotes && maybe_quote == '"'))
{
++buf.position();
@ -575,13 +577,13 @@ void readCSVStringInto(Vector & s, ReadBuffer & buf, const char delimiter)
}
}
void readCSVString(String & s, ReadBuffer & buf, const char delimiter)
void readCSVString(String & s, ReadBuffer & buf, const FormatSettings::CSV & settings)
{
s.clear();
readCSVStringInto(s, buf, delimiter);
readCSVStringInto(s, buf, settings);
}
template void readCSVStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf, const char delimiter);
template void readCSVStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf, const FormatSettings::CSV & settings);
template <typename Vector, typename ReturnType>

View File

@ -20,6 +20,8 @@
#include <Common/Arena.h>
#include <Common/UInt128.h>
#include <Formats/FormatSettings.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/VarInt.h>
@ -398,7 +400,8 @@ void readStringUntilEOF(String & s, ReadBuffer & buf);
/** Read string in CSV format.
* Parsing rules:
* - string could be placed in quotes; quotes could be single: ' or double: ";
* - string could be placed in quotes; quotes could be single: ' if FormatSettings::CSV::allow_single_quotes is true
* or double: " if FormatSettings::CSV::allow_double_quotes is true;
* - or string could be unquoted - this is determined by first character;
* - if string is unquoted, then it is read until next delimiter,
* either until end of line (CR or LF),
@ -407,7 +410,7 @@ void readStringUntilEOF(String & s, ReadBuffer & buf);
* - if string is in quotes, then it will be read until closing quote,
* but sequences of two consecutive quotes are parsed as single quote inside string;
*/
void readCSVString(String & s, ReadBuffer & buf, const char delimiter);
void readCSVString(String & s, ReadBuffer & buf, const FormatSettings::CSV & settings);
/// Read and append result to array of characters.
@ -430,7 +433,7 @@ template <typename Vector>
void readStringUntilEOFInto(Vector & s, ReadBuffer & buf);
template <typename Vector>
void readCSVStringInto(Vector & s, ReadBuffer & buf, const char delimiter);
void readCSVStringInto(Vector & s, ReadBuffer & buf, const FormatSettings::CSV & settings);
/// ReturnType is either bool or void. If bool, the function will return false instead of throwing an exception.
template <typename Vector, typename ReturnType = void>
@ -688,7 +691,7 @@ template <typename T>
inline std::enable_if_t<std::is_arithmetic_v<T>, void>
readCSV(T & x, ReadBuffer & buf) { readCSVSimple(x, buf); }
inline void readCSV(String & x, ReadBuffer & buf, const char delimiter = ',') { readCSVString(x, buf, delimiter); }
inline void readCSV(String & x, ReadBuffer & buf, const FormatSettings::CSV & settings) { readCSVString(x, buf, settings); }
inline void readCSV(LocalDate & x, ReadBuffer & buf) { readCSVSimple(x, buf); }
inline void readCSV(LocalDateTime & x, ReadBuffer & buf) { readCSVSimple(x, buf); }
inline void readCSV(UUID & x, ReadBuffer & buf) { readCSVSimple(x, buf); }

View File

@ -1731,9 +1731,9 @@ void Context::setFormatSchemaPath(const String & path)
shared->format_schema_path = path;
}
Context::getSampleBlockCacheType & Context::getSampleBlockCache() const
Context::SampleBlockCache & Context::getSampleBlockCache() const
{
return getQueryContext().get_sample_block_cache;
return getQueryContext().sample_block_cache;
}
std::shared_ptr<ActionLocksManager> Context::getActionLocksManager()

View File

@ -121,6 +121,9 @@ private:
UInt64 session_close_cycle = 0;
bool session_is_used = false;
using SampleBlockCache = std::unordered_map<std::string, Block>;
mutable SampleBlockCache sample_block_cache;
using DatabasePtr = std::shared_ptr<IDatabase>;
using Databases = std::map<String, std::shared_ptr<IDatabase>>;
@ -401,9 +404,7 @@ public:
/// User name and session identifier. Named sessions are local to users.
using SessionKey = std::pair<String, String>;
using getSampleBlockCacheType = std::unordered_map<std::string, Block>;
mutable Context::getSampleBlockCacheType get_sample_block_cache;
getSampleBlockCacheType & getSampleBlockCache() const;
SampleBlockCache & getSampleBlockCache() const;
private:
/** Check if the current client has access to the specified database.

View File

@ -1473,17 +1473,29 @@ void ExpressionAnalyzer::makeSetsForIndex()
}
void ExpressionAnalyzer::tryMakeSetFromSubquery(const ASTPtr & subquery_or_table_name)
void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name)
{
BlockIO res = interpretSubquery(subquery_or_table_name, context, subquery_depth + 1, {})->execute();
SetPtr set = std::make_shared<Set>(SizeLimits(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode));
SizeLimits set_for_index_size_limits;
if (settings.use_index_for_in_with_subqueries_max_values && settings.use_index_for_in_with_subqueries_max_values < settings.max_rows_in_set)
{
/// Silently cancel creating the set for index if the specific limit has been reached.
set_for_index_size_limits = SizeLimits(settings.use_index_for_in_with_subqueries_max_values, settings.max_bytes_in_set, OverflowMode::BREAK);
}
else
{
/// If the limit specific for set for index is lower than general limits for set - use general limit.
set_for_index_size_limits = SizeLimits(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode);
}
SetPtr set = std::make_shared<Set>(set_for_index_size_limits, true);
set->setHeader(res.in->getHeader());
while (Block block = res.in->read())
{
/// If the limits have been exceeded, give up and let the default subquery processing actions take place.
if (!set->insertFromBlock(block, true))
if (!set->insertFromBlock(block))
return;
}
@ -1521,7 +1533,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block &
if (typeid_cast<ASTSubquery *>(arg.get()) || typeid_cast<ASTIdentifier *>(arg.get()))
{
if (settings.use_index_for_in_with_subqueries)
tryMakeSetFromSubquery(arg);
tryMakeSetForIndexFromSubquery(arg);
}
else
{
@ -1589,7 +1601,7 @@ void ExpressionAnalyzer::makeSet(const ASTFunction * node, const Block & sample_
return;
}
SetPtr set = std::make_shared<Set>(SizeLimits(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode));
SetPtr set = std::make_shared<Set>(SizeLimits(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode), false);
/** The following happens for GLOBAL INs:
* - in the addExternalStorage function, the IN (SELECT ...) subquery is replaced with IN _data1,
@ -1711,8 +1723,8 @@ void ExpressionAnalyzer::makeExplicitSet(const ASTFunction * node, const Block &
+ left_arg_type->getName() + " and " + right_arg_type->getName() + ".",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
SetPtr set = std::make_shared<Set>(SizeLimits(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode));
set->createFromAST(set_element_types, elements_ast, context, create_ordered_set);
SetPtr set = std::make_shared<Set>(SizeLimits(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode), create_ordered_set);
set->createFromAST(set_element_types, elements_ast, context);
prepared_sets[right_arg->range] = std::move(set);
}

View File

@ -345,7 +345,7 @@ private:
* Create Set from a subuqery or a table expression in the query. The created set is suitable for using the index.
* The set will not be created if its size hits the limit.
*/
void tryMakeSetFromSubquery(const ASTPtr & subquery_or_table_name);
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name);
void makeSetsForIndexImpl(const ASTPtr & node, const Block & sample_block);

View File

@ -18,9 +18,6 @@ BlockIO InterpreterOptimizeQuery::execute()
{
const ASTOptimizeQuery & ast = typeid_cast<const ASTOptimizeQuery &>(*query_ptr);
if (ast.final && !ast.partition)
throw Exception("FINAL flag for OPTIMIZE query is meaningful only with specified PARTITION", ErrorCodes::BAD_ARGUMENTS);
StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__);
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);

View File

@ -1,3 +1,5 @@
#include <optional>
#include <Core/Field.h>
#include <Common/FieldVisitors.h>
#include <Core/Row.h>
@ -20,9 +22,12 @@
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/NullableUtils.h>
#include <Interpreters/sortBlock.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <ext/range.h>
namespace DB
{
@ -43,22 +48,34 @@ void NO_INLINE Set::insertFromBlockImpl(
const ColumnRawPtrs & key_columns,
size_t rows,
SetVariants & variants,
ConstNullMapPtr null_map)
ConstNullMapPtr null_map,
ColumnUInt8::Container * out_filter)
{
if (null_map)
insertFromBlockImplCase<Method, true>(method, key_columns, rows, variants, null_map);
{
if (out_filter)
insertFromBlockImplCase<Method, true, true>(method, key_columns, rows, variants, null_map, out_filter);
else
insertFromBlockImplCase<Method, true, false>(method, key_columns, rows, variants, null_map, out_filter);
}
else
insertFromBlockImplCase<Method, false>(method, key_columns, rows, variants, null_map);
{
if (out_filter)
insertFromBlockImplCase<Method, false, true>(method, key_columns, rows, variants, null_map, out_filter);
else
insertFromBlockImplCase<Method, false, false>(method, key_columns, rows, variants, null_map, out_filter);
}
}
template <typename Method, bool has_null_map>
template <typename Method, bool has_null_map, bool build_filter>
void NO_INLINE Set::insertFromBlockImplCase(
Method & method,
const ColumnRawPtrs & key_columns,
size_t rows,
SetVariants & variants,
ConstNullMapPtr null_map)
ConstNullMapPtr null_map,
ColumnUInt8::Container * out_filter)
{
typename Method::State state;
state.init(key_columns);
@ -78,6 +95,9 @@ void NO_INLINE Set::insertFromBlockImplCase(
if (inserted)
method.onNewKey(*it, keys_size, variants.string_pool);
if (build_filter)
(*out_filter)[i] = inserted;
}
}
@ -115,12 +135,22 @@ void Set::setHeader(const Block & block)
ConstNullMapPtr null_map{};
extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map);
if (fill_set_elements)
{
/// Create empty columns with set values in advance.
/// It is needed because set may be empty, so method 'insertFromBlock' will be never called.
set_elements.reserve(keys_size);
for (const auto & type : data_types)
set_elements.emplace_back(removeNullable(type)->createColumn());
}
/// Choose data structure to use for the set.
data.init(data.chooseMethod(key_columns, key_sizes));
}
bool Set::insertFromBlock(const Block & block, bool fill_set_elements)
bool Set::insertFromBlock(const Block & block)
{
std::unique_lock lock(rwlock);
@ -152,13 +182,18 @@ bool Set::insertFromBlock(const Block & block, bool fill_set_elements)
ConstNullMapPtr null_map{};
extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map);
/// Filter to extract distinct values from the block.
ColumnUInt8::MutablePtr filter;
if (fill_set_elements)
filter = ColumnUInt8::create(block.rows());
switch (data.type)
{
case SetVariants::Type::EMPTY:
break;
#define M(NAME) \
case SetVariants::Type::NAME: \
insertFromBlockImpl(*data.NAME, key_columns, rows, data, null_map); \
insertFromBlockImpl(*data.NAME, key_columns, rows, data, null_map, filter ? &filter->getData() : nullptr); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
@ -166,13 +201,13 @@ bool Set::insertFromBlock(const Block & block, bool fill_set_elements)
if (fill_set_elements)
{
for (size_t i = 0; i < rows; ++i)
for (size_t i = 0; i < keys_size; ++i)
{
std::vector<Field> new_set_elements;
for (size_t j = 0; j < keys_size; ++j)
new_set_elements.push_back((*key_columns[j])[i]);
set_elements->emplace_back(std::move(new_set_elements));
auto filtered_column = block.getByPosition(i).column->filter(filter->getData(), rows);
if (set_elements[i]->empty())
set_elements[i] = filtered_column;
else
set_elements[i]->assumeMutableRef().insertRangeFrom(*filtered_column, 0, filtered_column->size());
}
}
@ -196,7 +231,7 @@ static Field extractValueFromNode(ASTPtr & node, const IDataType & type, const C
}
void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & context, bool fill_set_elements)
void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & context)
{
/// Will form a block with values from the set.
@ -267,7 +302,7 @@ void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & co
}
Block block = header.cloneWithColumns(std::move(columns));
insertFromBlock(block, fill_set_elements);
insertFromBlock(block);
}
@ -403,9 +438,8 @@ void Set::executeOrdinary(
}
MergeTreeSetIndex::MergeTreeSetIndex(const SetElements & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_)
: ordered_set(),
indexes_mapping(std::move(index_mapping_))
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_)
: indexes_mapping(std::move(index_mapping_))
{
std::sort(indexes_mapping.begin(), indexes_mapping.end(),
[](const KeyTuplePositionMapping & l, const KeyTuplePositionMapping & r)
@ -420,16 +454,23 @@ MergeTreeSetIndex::MergeTreeSetIndex(const SetElements & set_elements, std::vect
return l.key_index == r.key_index;
}), indexes_mapping.end());
for (size_t i = 0; i < set_elements.size(); ++i)
{
std::vector<FieldWithInfinity> new_set_values;
for (size_t j = 0; j < indexes_mapping.size(); ++j)
new_set_values.emplace_back(set_elements[i][indexes_mapping[j].tuple_index]);
size_t tuple_size = indexes_mapping.size();
ordered_set.resize(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
ordered_set[i] = set_elements[indexes_mapping[i].tuple_index];
ordered_set.emplace_back(std::move(new_set_values));
Block block_to_sort;
SortDescription sort_description;
for (size_t i = 0; i < tuple_size; ++i)
{
block_to_sort.insert({ ordered_set[i], nullptr, "" });
sort_description.emplace_back(i, 1, 1);
}
std::sort(ordered_set.begin(), ordered_set.end());
sortBlock(block_to_sort, sort_description);
for (size_t i = 0; i < tuple_size; ++i)
ordered_set[i] = block_to_sort.getByPosition(i).column;
}
@ -439,15 +480,19 @@ MergeTreeSetIndex::MergeTreeSetIndex(const SetElements & set_elements, std::vect
*/
BoolMask MergeTreeSetIndex::mayBeTrueInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types)
{
std::vector<FieldWithInfinity> left_point;
std::vector<FieldWithInfinity> right_point;
left_point.reserve(indexes_mapping.size());
right_point.reserve(indexes_mapping.size());
size_t tuple_size = indexes_mapping.size();
using FieldWithInfinityTuple = std::vector<FieldWithInfinity>;
FieldWithInfinityTuple left_point;
FieldWithInfinityTuple right_point;
left_point.reserve(tuple_size);
right_point.reserve(tuple_size);
bool invert_left_infinities = false;
bool invert_right_infinities = false;
for (size_t i = 0; i < indexes_mapping.size(); ++i)
for (size_t i = 0; i < tuple_size; ++i)
{
std::optional<Range> new_range = KeyCondition::applyMonotonicFunctionsChainToRange(
key_ranges[indexes_mapping[i].key_index],
@ -491,16 +536,40 @@ BoolMask MergeTreeSetIndex::mayBeTrueInRange(const std::vector<Range> & key_rang
}
}
/// This allows to construct tuple in 'ordered_set' at specified index for comparison with range.
auto indices = ext::range(0, ordered_set.at(0)->size());
auto extract_tuple = [tuple_size, this](size_t i)
{
/// Inefficient.
FieldWithInfinityTuple res;
res.reserve(tuple_size);
for (size_t j = 0; j < tuple_size; ++j)
res.emplace_back((*ordered_set[j])[i]);
return res;
};
auto compare = [&extract_tuple](size_t i, const FieldWithInfinityTuple & rhs)
{
return extract_tuple(i) < rhs;
};
/** Because each parallelogram maps to a contiguous sequence of elements
* layed out in the lexicographically increasing order, the set intersects the range
* if and only if either bound coincides with an element or at least one element
* is between the lower bounds
*/
auto left_lower = std::lower_bound(ordered_set.begin(), ordered_set.end(), left_point);
auto right_lower = std::lower_bound(ordered_set.begin(), ordered_set.end(), right_point);
return {left_lower != right_lower
|| (left_lower != ordered_set.end() && *left_lower == left_point)
|| (right_lower != ordered_set.end() && *right_lower == right_point), true};
auto left_lower = std::lower_bound(indices.begin(), indices.end(), left_point, compare);
auto right_lower = std::lower_bound(indices.begin(), indices.end(), right_point, compare);
return
{
left_lower != right_lower
|| (left_lower != indices.end() && extract_tuple(*left_lower) == left_point)
|| (right_lower != indices.end() && extract_tuple(*right_lower) == right_point),
true
};
}
}

View File

@ -18,21 +18,22 @@ namespace DB
struct Range;
class FieldWithInfinity;
using SetElements = std::vector<std::vector<Field>>;
using SetElementsPtr = std::unique_ptr<SetElements>;
class IFunctionBase;
using FunctionBasePtr = std::shared_ptr<IFunctionBase>;
/** Data structure for implementation of IN expression.
*/
class Set
{
public:
Set(const SizeLimits & limits) :
log(&Logger::get("Set")),
limits(limits),
set_elements(std::make_unique<SetElements>())
/// 'fill_set_elements': in addition to hash table
/// (that is useful only for checking that some value is in the set and may not store the original values),
/// store all set elements in explicit form.
/// This is needed for subsequent use for index.
Set(const SizeLimits & limits, bool fill_set_elements)
: log(&Logger::get("Set")),
limits(limits), fill_set_elements(fill_set_elements)
{
}
@ -46,7 +47,7 @@ public:
* 'node' - list of values: 1, 2, 3 or list of tuples: (1, 2), (3, 4), (5, 6).
* 'fill_set_elements' - if true, fill vector of elements. For primary key to work.
*/
void createFromAST(const DataTypes & types, ASTPtr node, const Context & context, bool fill_set_elements);
void createFromAST(const DataTypes & types, ASTPtr node, const Context & context);
/** Create a Set from stream.
* Call setHeader, then call insertFromBlock for each block.
@ -54,7 +55,7 @@ public:
void setHeader(const Block & header);
/// Returns false, if some limit was exceeded and no need to insert more data.
bool insertFromBlock(const Block & block, bool fill_set_elements);
bool insertFromBlock(const Block & block);
/** For columns of 'block', check belonging of corresponding rows to the set.
* Return UInt8 column with the result.
@ -66,7 +67,8 @@ public:
const DataTypes & getDataTypes() const { return data_types; }
SetElements & getSetElements() { return *set_elements.get(); }
bool hasExplicitSetElements() const { return fill_set_elements; }
const Columns & getSetElements() const { return set_elements; }
private:
size_t keys_size = 0;
@ -99,6 +101,9 @@ private:
/// Limitations on the maximum size of the set
SizeLimits limits;
/// Do we need to additionally store all elements of the set in explicit form for subsequent use for index.
bool fill_set_elements;
/// If in the left part columns contains the same types as the elements of the set.
void executeOrdinary(
const ColumnRawPtrs & key_columns,
@ -106,9 +111,9 @@ private:
bool negative,
const PaddedPODArray<UInt8> * null_map) const;
/// Vector of elements of `Set`.
/// Collected elements of `Set`.
/// It is necessary for the index to work on the primary key in the IN statement.
SetElementsPtr set_elements;
Columns set_elements;
/** Protects work with the set in the functions `insertFromBlock` and `execute`.
* These functions can be called simultaneously from different threads only when using StorageSet,
@ -123,15 +128,17 @@ private:
const ColumnRawPtrs & key_columns,
size_t rows,
SetVariants & variants,
ConstNullMapPtr null_map);
ConstNullMapPtr null_map,
ColumnUInt8::Container * out_filter);
template <typename Method, bool has_null_map>
template <typename Method, bool has_null_map, bool build_filter>
void insertFromBlockImplCase(
Method & method,
const ColumnRawPtrs & key_columns,
size_t rows,
SetVariants & variants,
ConstNullMapPtr null_map);
ConstNullMapPtr null_map,
ColumnUInt8::Container * out_filter);
template <typename Method>
void executeImpl(
@ -156,6 +163,7 @@ using SetPtr = std::shared_ptr<Set>;
using ConstSetPtr = std::shared_ptr<const Set>;
using Sets = std::vector<SetPtr>;
class IFunction;
using FunctionPtr = std::shared_ptr<IFunction>;
@ -164,8 +172,7 @@ class MergeTreeSetIndex
{
public:
/** Mapping for tuple positions from Set::set_elements to
* position of pk index and data type of this pk column
* and functions chain applied to this column.
* position of pk index and functions chain applied to this column.
*/
struct KeyTuplePositionMapping
{
@ -174,16 +181,14 @@ public:
std::vector<FunctionBasePtr> functions;
};
MergeTreeSetIndex(const SetElements & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_);
MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_);
size_t size() const { return ordered_set.size(); }
size_t size() const { return ordered_set.at(0)->size(); }
BoolMask mayBeTrueInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types);
private:
using OrderedTuples = std::vector<std::vector<FieldWithInfinity>>;
OrderedTuples ordered_set;
Columns ordered_set;
std::vector<KeyTuplePositionMapping> indexes_mapping;
};

View File

@ -33,32 +33,32 @@ struct Settings
*/
#define APPLY_FOR_SETTINGS(M) \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
M(SettingUInt64, min_compress_block_size, 65536, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
M(SettingUInt64, max_compress_block_size, 1048576, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \
M(SettingUInt64, max_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE, "The maximum block size for insertion, if we control the creation of blocks for insertion.") \
M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.") \
M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.") \
M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.") \
M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.") \
M(SettingUInt64, max_distributed_connections, DEFAULT_MAX_DISTRIBUTED_CONNECTIONS, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).") \
M(SettingUInt64, max_query_size, DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)") \
M(SettingUInt64, interactive_delay, DEFAULT_INTERACTIVE_DELAY, "The interval in microseconds to check if the request is cancelled, and to send progress info.") \
M(SettingUInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).") \
M(SettingUInt64, max_query_size, 262144, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)") \
M(SettingUInt64, interactive_delay, 100000, "The interval in microseconds to check if the request is cancelled, and to send progress info.") \
M(SettingSeconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.") \
M(SettingMilliseconds, connect_timeout_with_failover_ms, DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS, "Connection timeout for selecting first healthy replica.") \
M(SettingSeconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "") \
M(SettingSeconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "") \
M(SettingMilliseconds, queue_max_wait_ms, DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS, "The wait time in the request queue, if the number of concurrent requests exceeds the maximum.") \
M(SettingMilliseconds, queue_max_wait_ms, 5000, "The wait time in the request queue, if the number of concurrent requests exceeds the maximum.") \
M(SettingUInt64, poll_interval, DBMS_DEFAULT_POLL_INTERVAL, "Block at the query wait loop on the server for the specified number of seconds.") \
M(SettingUInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.") \
M(SettingUInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.") \
M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \
M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.") \
M(SettingUInt64, background_schedule_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.") \
M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.") \
M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.") \
\
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS, "Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown.") \
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown.") \
\
M(SettingBool, distributed_directory_monitor_batch_inserts, false, "Should StorageDistributed DirectoryMonitors try to batch individual inserts into bigger ones.") \
\
@ -187,6 +187,7 @@ struct Settings
M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \
M(SettingBool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown") \
M(SettingBool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.") \
M(SettingUInt64, use_index_for_in_with_subqueries_max_values, 100000, "Don't use index of a table for filtering by right hand size of the IN operator if the size of set is larger than specified threshold. This allows to avoid performance degradation and higher memory usage due to preparation of additional data structures.") \
\
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \
M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \
@ -261,6 +262,9 @@ struct Settings
M(SettingUInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.")\
M(SettingUInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.") \
M(SettingChar, format_csv_delimiter, ',', "The character to be considered as a delimiter in CSV data. If setting with a string, a string has to have a length of 1.") \
M(SettingBool, format_csv_allow_single_quotes, 1, "If it is set to true, allow strings in single quotes.") \
M(SettingBool, format_csv_allow_double_quotes, 1, "If it is set to true, allow strings in double quotes.") \
\
M(SettingUInt64, enable_conditional_computation, 0, "Enable conditional computations") \
\
M(SettingDateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.") \

View File

@ -121,7 +121,7 @@ void sortBlock(Block & block, const SortDescription & description, size_t limit)
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->permute(perm, limit);
block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
}
else
{
@ -166,7 +166,7 @@ void sortBlock(Block & block, const SortDescription & description, size_t limit)
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->permute(perm, limit);
block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
}
}

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <common/logger_useful.h>
@ -11,59 +11,59 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
AbandonableLockInZooKeeper::AbandonableLockInZooKeeper(
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Requests * precheck_ops)
: zookeeper(&zookeeper_), path_prefix(path_prefix_)
{
String abandonable_path = temp_path + "/abandonable_lock-";
/// The /abandonable_lock- name is for backward compatibility.
String holder_path_prefix = temp_path + "/abandonable_lock-";
/// Let's create an secondary ephemeral node.
if (!precheck_ops || precheck_ops->empty())
{
holder_path = zookeeper->create(abandonable_path, "", zkutil::CreateMode::EphemeralSequential);
holder_path = zookeeper->create(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential);
}
else
{
precheck_ops->emplace_back(zkutil::makeCreateRequest(abandonable_path, "", zkutil::CreateMode::EphemeralSequential));
precheck_ops->emplace_back(zkutil::makeCreateRequest(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential));
zkutil::Responses op_results = zookeeper->multi(*precheck_ops);
holder_path = dynamic_cast<const zkutil::CreateResponse &>(*op_results.back()).path_created;
}
/// Write the path to the secondary node in the main node.
path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::PersistentSequential);
path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::EphemeralSequential);
if (path.size() <= path_prefix.size())
throw Exception("Logical error: name of sequential node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
}
void AbandonableLockInZooKeeper::unlock()
void EphemeralLockInZooKeeper::unlock()
{
checkCreated();
zookeeper->remove(path);
zookeeper->remove(holder_path);
zkutil::Requests ops;
getUnlockOps(ops);
zookeeper->multi(ops);
holder_path = "";
}
void AbandonableLockInZooKeeper::getUnlockOps(zkutil::Requests & ops)
void EphemeralLockInZooKeeper::getUnlockOps(zkutil::Requests & ops)
{
checkCreated();
ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(holder_path, -1));
}
AbandonableLockInZooKeeper::~AbandonableLockInZooKeeper()
EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper()
{
if (!zookeeper || holder_path.empty())
if (!isCreated())
return;
try
{
zookeeper->tryRemove(holder_path);
zookeeper->trySet(path, ""); /// It's not strictly necessary.
unlock();
}
catch (...)
{
tryLogCurrentException("~AbandonableLockInZooKeeper");
tryLogCurrentException("~EphemeralLockInZooKeeper");
}
}

View File

@ -13,33 +13,24 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
/** The synchronization is primitive. Works as follows:
* Creates a non-ephemeral incremental node and marks it as locked (LOCKED).
* `unlock()` unlocks it (UNLOCKED).
* When the destructor is called or the session ends in ZooKeeper, it goes into the ABANDONED state.
* (Including when the program is halted).
*/
class AbandonableLockInZooKeeper : public boost::noncopyable
/// A class that is used for locking a block number in a partition.
/// It creates a secondary ephemeral node in `temp_path` and a main ephemeral node with `path_prefix`
/// that references the secondary node. The reasons for this two-level scheme are historical (of course
/// it would be simpler to allocate block numbers for all partitions in one ZK directory).
class EphemeralLockInZooKeeper : public boost::noncopyable
{
public:
enum State
{
UNLOCKED,
LOCKED,
ABANDONED,
};
AbandonableLockInZooKeeper(
EphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Requests * precheck_ops = nullptr);
AbandonableLockInZooKeeper() = default;
EphemeralLockInZooKeeper() = default;
AbandonableLockInZooKeeper(AbandonableLockInZooKeeper && rhs) noexcept
EphemeralLockInZooKeeper(EphemeralLockInZooKeeper && rhs) noexcept
{
*this = std::move(rhs);
}
AbandonableLockInZooKeeper & operator=(AbandonableLockInZooKeeper && rhs) noexcept
EphemeralLockInZooKeeper & operator=(EphemeralLockInZooKeeper && rhs) noexcept
{
zookeeper = rhs.zookeeper;
rhs.zookeeper = nullptr;
@ -82,10 +73,10 @@ public:
void checkCreated() const
{
if (!isCreated())
throw Exception("AbandonableLock is not created", ErrorCodes::LOGICAL_ERROR);
throw Exception("EphemeralLock is not created", ErrorCodes::LOGICAL_ERROR);
}
~AbandonableLockInZooKeeper();
~EphemeralLockInZooKeeper();
private:
zkutil::ZooKeeper * zookeeper = nullptr;
@ -95,8 +86,7 @@ private:
};
/// Acquires block number locks in all partitions. The class is called Ephemeral- instead of Abandonable-
/// because it creates ephemeral block nodes (there is no need to leave abandoned tombstones).
/// Acquires block number locks in all partitions.
class EphemeralLocksInAllPartitions
{
public:

View File

@ -269,13 +269,13 @@ KeyCondition::KeyCondition(
const SelectQueryInfo & query_info,
const Context & context,
const NamesAndTypesList & all_columns,
const SortDescription & sort_descr_,
const Names & key_column_names,
const ExpressionActionsPtr & key_expr_)
: sort_descr(sort_descr_), key_expr(key_expr_), prepared_sets(query_info.sets)
: key_expr(key_expr_), prepared_sets(query_info.sets)
{
for (size_t i = 0; i < sort_descr.size(); ++i)
for (size_t i = 0, size = key_column_names.size(); i < size; ++i)
{
std::string name = sort_descr[i].column_name;
std::string name = key_column_names[i];
if (!key_columns.count(name))
key_columns[name] = i;
}
@ -484,14 +484,17 @@ void KeyCondition::getKeyTuplePositionMapping(
}
/// Try to prepare KeyTuplePositionMapping for tuples from IN expression.
bool KeyCondition::isTupleIndexable(
bool KeyCondition::tryPrepareSetIndex(
const ASTPtr & node,
const Context & context,
RPNElement & out,
const SetPtr & prepared_set,
size_t & out_key_column_num)
{
/// The index can be prepared if the elements of the set were saved in advance.
if (!prepared_set->hasExplicitSetElements())
return false;
out_key_column_num = 0;
std::vector<MergeTreeSetIndex::KeyTuplePositionMapping> indexes_mapping;
@ -523,8 +526,7 @@ bool KeyCondition::isTupleIndexable(
if (indexes_mapping.empty())
return false;
out.set_index = std::make_shared<MergeTreeSetIndex>(
prepared_set->getSetElements(), std::move(indexes_mapping));
out.set_index = std::make_shared<MergeTreeSetIndex>(prepared_set->getSetElements(), std::move(indexes_mapping));
return true;
}
@ -636,13 +638,13 @@ bool KeyCondition::atomFromAST(const ASTPtr & node, const Context & context, Blo
DataTypePtr key_expr_type; /// Type of expression containing key column
size_t key_arg_pos; /// Position of argument with key column (non-const argument)
size_t key_column_num; /// Number of a key column (inside sort_descr array)
size_t key_column_num; /// Number of a key column (inside key_column_names array)
MonotonicFunctionsChain chain;
bool is_set_const = false;
bool is_constant_transformed = false;
if (prepared_sets.count(args[1]->range)
&& isTupleIndexable(args[0], context, out, prepared_sets[args[1]->range], key_column_num))
&& tryPrepareSetIndex(args[0], context, out, prepared_sets[args[1]->range], key_column_num))
{
key_arg_pos = 0;
is_set_const = true;

View File

@ -189,6 +189,7 @@ public:
String toString() const;
};
/// Class that extends arbitrary objects with infinities, like +-inf for floats
class FieldWithInfinity
{
@ -216,6 +217,7 @@ private:
FieldWithInfinity(const Type type_);
};
/** Condition on the index.
*
* Consists of the conditions for the key belonging to all possible ranges or sets,
@ -232,7 +234,7 @@ public:
const SelectQueryInfo & query_info,
const Context & context,
const NamesAndTypesList & all_columns,
const SortDescription & sort_descr,
const Names & key_column_names,
const ExpressionActionsPtr & key_expr);
/// Whether the condition is feasible in the key range.
@ -324,8 +326,8 @@ private:
public:
static const AtomMap atom_map;
private:
private:
bool mayBeTrueInRange(
size_t used_key_size,
const Field * left_key,
@ -370,7 +372,10 @@ private:
const size_t tuple_index,
size_t & out_key_column_num);
bool isTupleIndexable(
/// If it's possible to make an RPNElement
/// that will filter values (possibly tuples) by the content of 'prepared_set',
/// do it and return true.
bool tryPrepareSetIndex(
const ASTPtr & node,
const Context & context,
RPNElement & out,
@ -379,7 +384,6 @@ private:
RPN rpn;
SortDescription sort_descr;
ColumnIndices key_columns;
ExpressionActionsPtr key_expr;
PreparedSets prepared_sets;

View File

@ -159,8 +159,9 @@ MergeTreeData::MergeTreeData(
Poco::File(full_path + "detached").createDirectory();
String version_file_path = full_path + "format_version.txt";
// When data path not exists, ignore the format_version check
if (!attach || !path_exists)
auto version_file_exists = Poco::File(version_file_path).exists();
// When data path or file not exists, ignore the format_version check
if (!attach || !path_exists || !version_file_exists)
{
format_version = min_format_version;
WriteBufferFromFile buf(version_file_path);
@ -214,19 +215,16 @@ static void checkKeyExpression(const ExpressionActions & expr, const Block & sam
void MergeTreeData::initPrimaryKey()
{
auto addSortDescription = [](SortDescription & descr, const ASTPtr & expr_ast)
auto addSortColumns = [](Names & out, const ASTPtr & expr_ast)
{
descr.reserve(descr.size() + expr_ast->children.size());
out.reserve(out.size() + expr_ast->children.size());
for (const ASTPtr & ast : expr_ast->children)
{
String name = ast->getColumnName();
descr.emplace_back(name, 1, 1);
}
out.emplace_back(ast->getColumnName());
};
/// Initialize description of sorting for primary key.
primary_sort_descr.clear();
addSortDescription(primary_sort_descr, primary_expr_ast);
primary_sort_columns.clear();
addSortColumns(primary_sort_columns, primary_expr_ast);
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumns().getAllPhysical()).getActions(false);
@ -243,10 +241,10 @@ void MergeTreeData::initPrimaryKey()
for (size_t i = 0; i < primary_key_size; ++i)
primary_key_data_types[i] = primary_key_sample.getByPosition(i).type;
sort_descr = primary_sort_descr;
sort_columns = primary_sort_columns;
if (secondary_sort_expr_ast)
{
addSortDescription(sort_descr, secondary_sort_expr_ast);
addSortColumns(sort_columns, secondary_sort_expr_ast);
secondary_sort_expr = ExpressionAnalyzer(secondary_sort_expr_ast, context, nullptr, getColumns().getAllPhysical()).getActions(false);
ExpressionActionsPtr projected_expr =
@ -279,7 +277,6 @@ void MergeTreeData::initPartitionKey()
{
minmax_idx_columns.emplace_back(column.name);
minmax_idx_column_types.emplace_back(column.type);
minmax_idx_sort_descr.emplace_back(column.name, 1, 1);
}
/// Try to find the date column in columns used by the partition key (a common case).
@ -2282,14 +2279,14 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
bool MergeTreeData::isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const
{
String column_name = node->getColumnName();
const String column_name = node->getColumnName();
for (const auto & column : primary_sort_descr)
if (column_name == column.column_name)
for (const auto & name : primary_sort_columns)
if (column_name == name)
return true;
for (const auto & column : minmax_idx_sort_descr)
if (column_name == column.column_name)
for (const auto & name : minmax_idx_columns)
if (column_name == name)
return true;
if (const ASTFunction * func = typeid_cast<const ASTFunction *>(node.get()))

View File

@ -1,6 +1,5 @@
#pragma once
#include <Core/SortDescription.h>
#include <Common/SimpleIncrement.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
@ -480,11 +479,11 @@ public:
broken_part_callback(name);
}
bool hasPrimaryKey() const { return !primary_sort_descr.empty(); }
bool hasPrimaryKey() const { return !primary_sort_columns.empty(); }
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
ExpressionActionsPtr getSecondarySortExpression() const { return secondary_sort_expr; } /// may return nullptr
SortDescription getPrimarySortDescription() const { return primary_sort_descr; }
SortDescription getSortDescription() const { return sort_descr; }
Names getPrimarySortColumns() const { return primary_sort_columns; }
Names getSortColumns() const { return sort_columns; }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);
@ -555,7 +554,6 @@ public:
Names minmax_idx_columns;
DataTypes minmax_idx_column_types;
Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column.
SortDescription minmax_idx_sort_descr; /// For use with KeyCondition.
/// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0};
@ -576,10 +574,10 @@ private:
ExpressionActionsPtr primary_expr;
/// Additional expression for sorting (of rows with the same primary keys).
ExpressionActionsPtr secondary_sort_expr;
/// Sort description for primary key. Is the prefix of sort_descr.
SortDescription primary_sort_descr;
/// Sort description for primary key + secondary sorting columns.
SortDescription sort_descr;
/// Names of columns for primary key. Is the prefix of sort_columns.
Names primary_sort_columns;
/// Names of columns for primary key + secondary sorting columns.
Names sort_columns;
String database_name;
String table_name;

View File

@ -549,7 +549,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
Names all_column_names = data.getColumns().getNamesOfPhysical();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
const SortDescription sort_desc = data.getSortDescription();
NamesAndTypesList gathering_columns, merging_columns;
Names gathering_column_names, merging_column_names;
@ -611,6 +610,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
src_streams.emplace_back(std::move(input));
}
Names sort_columns = data.getSortColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Block header = src_streams.at(0)->getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
/// that is going in insertion order.
@ -620,38 +628,38 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{
case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, rows_sources_write_buf.get(), true);
src_streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE, 0, rows_sources_write_buf.get(), true);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
break;
case MergeTreeData::MergingParams::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_description, data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Replacing:
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
src_streams, sort_description, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
break;
case MergeTreeData::MergingParams::Graphite:
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE,
src_streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE,
data.merging_params.graphite_params, time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_stream = std::make_unique<VersionedCollapsingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, false, rows_sources_write_buf.get());
src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, false, rows_sources_write_buf.get());
break;
default:
@ -927,12 +935,12 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
* When M > N parts could be replaced?
* - new block was added in ReplicatedMergeTreeBlockOutputStream;
* - it was added to working dataset in memory and renamed on filesystem;
* - but ZooKeeper transaction that add its to reference dataset in ZK and unlocks AbandonableLock is failed;
* - but ZooKeeper transaction that adds it to reference dataset in ZK failed;
* - and it is failed due to connection loss, so we don't rollback working dataset in memory,
* because we don't know if the part was added to ZK or not
* (see ReplicatedMergeTreeBlockOutputStream)
* - then method selectPartsToMerge selects a range and see, that AbandonableLock for this part is abandoned,
* and so, it is possible to merge a range skipping this part.
* - then method selectPartsToMerge selects a range and sees, that EphemeralLock for the block in this part is unlocked,
* and so it is possible to merge a range skipping this part.
* (NOTE: Merging with part that is not in ZK is not possible, see checks in 'createLogEntryToMergeParts'.)
* - and after merge, this part will be removed in addition to parts that was merged.
*/

View File

@ -448,7 +448,7 @@ void MergeTreeDataPart::loadIndex()
.getSize() / MERGE_TREE_MARK_SIZE;
}
size_t key_size = storage.primary_sort_descr.size();
size_t key_size = storage.primary_sort_columns.size();
if (key_size)
{
@ -630,7 +630,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
if (!checksums.empty())
{
if (!storage.primary_sort_descr.empty() && !checksums.files.count("primary.idx"))
if (!storage.primary_sort_columns.empty() && !checksums.files.count("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (require_part_metadata)
@ -683,7 +683,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
};
/// Check that the primary key index is not empty.
if (!storage.primary_sort_descr.empty())
if (!storage.primary_sort_columns.empty())
check_file_not_empty(path + "primary.idx");
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)

View File

@ -196,17 +196,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
processed_stage = QueryProcessingStage::FetchColumns;
const Settings & settings = context.getSettingsRef();
SortDescription sort_descr = data.getPrimarySortDescription();
Names primary_sort_columns = data.getPrimarySortColumns();
KeyCondition key_condition(query_info, context, available_real_and_virtual_columns, sort_descr,
data.getPrimaryExpression());
KeyCondition key_condition(
query_info, context, available_real_and_virtual_columns,
primary_sort_columns, data.getPrimaryExpression());
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
{
std::stringstream exception_message;
exception_message << "Primary key (";
for (size_t i = 0, size = sort_descr.size(); i < size; ++i)
exception_message << (i == 0 ? "" : ", ") << sort_descr[i].column_name;
for (size_t i = 0, size = primary_sort_columns.size(); i < size; ++i)
exception_message << (i == 0 ? "" : ", ") << primary_sort_columns[i];
exception_message << ") is not used and setting 'force_primary_key' is set.";
throw Exception(exception_message.str(), ErrorCodes::INDEX_NOT_USED);
@ -217,7 +218,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
{
minmax_idx_condition.emplace(
query_info, context, available_real_and_virtual_columns,
data.minmax_idx_sort_descr, data.minmax_idx_expr);
data.minmax_idx_columns, data.minmax_idx_expr);
if (settings.force_index_by_date && minmax_idx_condition->alwaysUnknownOrTrue())
{
@ -781,36 +782,44 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.getPrimaryExpression()));
}
BlockInputStreamPtr merged;
Names sort_columns = data.getSortColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Block header = to_merge.at(0)->getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
BlockInputStreamPtr merged;
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
merged = std::make_shared<MergingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
merged = std::make_shared<MergingSortedBlockInputStream>(to_merge, sort_description, max_block_size);
break;
case MergeTreeData::MergingParams::Collapsing:
merged = std::make_shared<CollapsingFinalBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column);
to_merge, sort_description, data.merging_params.sign_column);
break;
case MergeTreeData::MergingParams::Summing:
merged = std::make_shared<SummingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
sort_description, data.merging_params.columns_to_sum, max_block_size);
break;
case MergeTreeData::MergingParams::Aggregating:
merged = std::make_shared<AggregatingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
merged = std::make_shared<AggregatingSortedBlockInputStream>(to_merge, sort_description, max_block_size);
break;
case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream
merged = std::make_shared<ReplacingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.version_column, max_block_size);
sort_description, data.merging_params.version_column, max_block_size);
break;
case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column, max_block_size, true);
to_merge, sort_description, data.merging_params.sign_column, max_block_size, true);
break;
case MergeTreeData::MergingParams::Graphite:

View File

@ -183,7 +183,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
secondary_sort_expr->execute(block);
}
SortDescription sort_descr = data.getSortDescription();
Names sort_columns = data.getSortColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
@ -192,9 +198,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
IColumn::Permutation perm;
if (data.hasPrimaryKey())
{
if (!isAlreadySorted(block, sort_descr))
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_descr, perm);
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else

View File

@ -12,6 +12,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/NestedUtils.h>
#include <ext/scope_guard.h>
#include <ext/collection_cast.h>
#include <ext/map.h>
#include <memory>
#include <unordered_map>
@ -39,8 +40,7 @@ MergeTreeWhereOptimizer::MergeTreeWhereOptimizer(
const MergeTreeData & data,
const Names & column_names,
Logger * log)
: primary_key_columns{ext::map<std::unordered_set>(data.getPrimarySortDescription(),
[] (const SortColumnDescription & col) { return col.column_name; })},
: primary_key_columns{ext::collection_cast<std::unordered_set>(data.getPrimarySortColumns())},
table_columns{ext::map<std::unordered_set>(data.getColumns().getAllPhysical(),
[] (const NameAndTypePair & col) { return col.name; })},
block_with_constants{KeyCondition::getBlockWithConstants(query_info.query, context, data.getColumns().getAllPhysical())},

View File

@ -365,26 +365,20 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
OffsetColumns offset_columns;
auto sort_description = storage.getPrimarySortDescription();
auto sort_columns = storage.getPrimarySortColumns();
/// Here we will add the columns related to the Primary Key, then write the index.
std::vector<ColumnWithTypeAndName> primary_columns(sort_description.size());
std::vector<ColumnWithTypeAndName> primary_columns(sort_columns.size());
std::map<String, size_t> primary_columns_name_to_position;
for (size_t i = 0, size = sort_description.size(); i < size; ++i)
for (size_t i = 0, size = sort_columns.size(); i < size; ++i)
{
const auto & descr = sort_description[i];
String name = !descr.column_name.empty()
? descr.column_name
: block.safeGetByPosition(descr.column_number).name;
const auto & name = sort_columns[i];
if (!primary_columns_name_to_position.emplace(name, i).second)
throw Exception("Primary key contains duplicate columns", ErrorCodes::BAD_ARGUMENTS);
primary_columns[i] = !descr.column_name.empty()
? block.getByName(descr.column_name)
: block.safeGetByPosition(descr.column_number);
primary_columns[i] = block.getByName(name);
/// Reorder primary key columns in advance and add them to `primary_columns`.
if (permutation)
@ -393,8 +387,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
if (index_columns.empty())
{
index_columns.resize(sort_description.size());
for (size_t i = 0, size = sort_description.size(); i < size; ++i)
index_columns.resize(sort_columns.size());
for (size_t i = 0, size = sort_columns.size(); i < size; ++i)
index_columns[i] = primary_columns[i].column->cloneEmpty();
}

View File

@ -1,5 +1,5 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Interpreters/PartLog.h>

View File

@ -1266,15 +1266,19 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
prev_virtual_parts = queue.virtual_parts;
}
/// Load current quorum status.
auto quorum_last_part_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/last_part");
auto quorum_status_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/status");
/// Load current inserts
std::unordered_set<String> abandonable_lock_holders;
std::unordered_set<String> lock_holder_paths;
for (const String & entry : zookeeper->getChildren(queue.zookeeper_path + "/temp"))
{
if (startsWith(entry, "abandonable_lock-"))
abandonable_lock_holders.insert(queue.zookeeper_path + "/temp/" + entry);
lock_holder_paths.insert(queue.zookeeper_path + "/temp/" + entry);
}
if (!abandonable_lock_holders.empty())
if (!lock_holder_paths.empty())
{
Strings partitions = zookeeper->getChildren(queue.zookeeper_path + "/block_numbers");
std::vector<std::future<zkutil::ListResponse>> lock_futures;
@ -1310,21 +1314,22 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
for (BlockInfo & block : block_infos)
{
zkutil::GetResponse resp = block.contents_future.get();
if (!resp.error && abandonable_lock_holders.count(resp.data))
if (!resp.error && lock_holder_paths.count(resp.data))
committing_blocks[block.partition].insert(block.number);
}
}
queue_.pullLogsToQueue(zookeeper);
/// Load current quorum status.
zookeeper->tryGet(queue.zookeeper_path + "/quorum/last_part", last_quorum_part);
zkutil::GetResponse quorum_last_part_response = quorum_last_part_future.get();
if (!quorum_last_part_response.error)
last_quorum_part = quorum_last_part_response.data;
String quorum_status_str;
if (zookeeper->tryGet(queue.zookeeper_path + "/quorum/status", quorum_status_str))
zkutil::GetResponse quorum_status_response = quorum_status_future.get();
if (!quorum_status_response.error)
{
ReplicatedMergeTreeQuorumEntry quorum_status;
quorum_status.fromString(quorum_status_str);
quorum_status.fromString(quorum_status_response.data);
inprogress_quorum_part = quorum_status.part_name;
}
else
@ -1338,7 +1343,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
/// A sketch of a proof of why this method actually works:
///
/// The trickiest part is to ensure that no new parts will ever appear in the range of blocks between left and right.
/// Inserted parts get their block numbers by acquiring an abandonable lock (see AbandonableLockInZooKeeper.h).
/// Inserted parts get their block numbers by acquiring an ephemeral lock (see EphemeralLockInZooKeeper.h).
/// These block numbers are monotonically increasing in a partition.
///
/// Because there is a window between the moment the inserted part gets its block number and

View File

@ -45,6 +45,11 @@ void MutationCommands::validate(const IStorage & table, const Context & context)
case MutationCommand::DELETE:
{
auto actions = ExpressionAnalyzer(command.predicate, context, {}, all_columns).getActions(true);
/// Try executing the resulting actions on the table sample block to detect malformed queries.
auto table_sample_block = table.getSampleBlock();
actions->execute(table_sample_block);
const ColumnWithTypeAndName & predicate_column = actions->getSampleBlock().getByName(
command.predicate->getColumnName());
checkColumnCanBeUsedAsFilter(predicate_column);

View File

@ -3,8 +3,9 @@
#include <Databases/IDatabase.h>
#include <DataTypes/DataTypeFactory.h>
#include <Storages/StorageDistributed.h>
#include <Storages/VirtualColumnFactory.h>
#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/StorageFactory.h>
@ -57,6 +58,7 @@ namespace ErrorCodes
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int INFINITE_LOOP;
extern const int TYPE_MISMATCH;
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
@ -316,18 +318,36 @@ void StorageDistributed::truncate(const ASTPtr &)
}
}
namespace
{
/// NOTE This is weird. Get rid of this.
std::map<String, String> virtual_columns =
{
{"_table", "String"},
{"_part", "String"},
{"_part_index", "UInt64"},
{"_sample_factor", "Float64"},
};
}
NameAndTypePair StorageDistributed::getColumn(const String & column_name) const
{
if (const auto & type = VirtualColumnFactory::tryGetType(column_name))
return { column_name, type };
if (getColumns().hasPhysical(column_name))
return getColumns().getPhysical(column_name);
return getColumns().getPhysical(column_name);
auto it = virtual_columns.find(column_name);
if (it != virtual_columns.end())
return { it->first, DataTypeFactory::instance().get(it->second) };
throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
}
bool StorageDistributed::hasColumn(const String & column_name) const
{
return VirtualColumnFactory::hasColumn(column_name) || getColumns().hasPhysical(column_name);
return virtual_columns.count(column_name) || getColumns().hasPhysical(column_name);
}
void StorageDistributed::createDirectoryMonitors()

View File

@ -11,8 +11,6 @@
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageFactory.h>
#include <Storages/VirtualColumnFactory.h>
#include <Common/typeid_cast.h>

View File

@ -9,7 +9,6 @@
#include <Storages/StorageMerge.h>
#include <Storages/StorageFactory.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/VirtualColumnFactory.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateConstantExpression.h>
@ -32,6 +31,7 @@ namespace ErrorCodes
extern const int ILLEGAL_PREWHERE;
extern const int INCOMPATIBLE_SOURCE_TABLES;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
@ -48,22 +48,44 @@ StorageMerge::StorageMerge(
}
/// NOTE Structure of underlying tables as well as their set are not constant,
/// so the results of these methods may become obsolete after the call.
NameAndTypePair StorageMerge::getColumn(const String & column_name) const
{
auto type = VirtualColumnFactory::tryGetType(column_name);
if (type)
return NameAndTypePair(column_name, type);
/// virtual column of the Merge table itself
if (column_name == "_table")
return { column_name, std::make_shared<DataTypeString>() };
return IStorage::getColumn(column_name);
if (IStorage::hasColumn(column_name))
return IStorage::getColumn(column_name);
/// virtual (and real) columns of the underlying tables
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->getColumn(column_name);
throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
}
bool StorageMerge::hasColumn(const String & column_name) const
{
return VirtualColumnFactory::hasColumn(column_name) || IStorage::hasColumn(column_name);
if (column_name == "_table")
return true;
if (IStorage::hasColumn(column_name))
return true;
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->hasColumn(column_name);
return false;
}
bool StorageMerge::isRemote() const
template <typename F>
StoragePtr StorageMerge::getFirstTable(F && predicate) const
{
auto database = context.getDatabase(source_database);
auto iterator = database->getIterator(context);
@ -73,14 +95,21 @@ bool StorageMerge::isRemote() const
if (table_name_regexp.match(iterator->name()))
{
auto & table = iterator->table();
if (table.get() != this && table->isRemote())
return true;
if (table.get() != this && predicate(table))
return table;
}
iterator->next();
}
return false;
return {};
}
bool StorageMerge::isRemote() const
{
auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table->isRemote(); });
return first_remote_table != nullptr;
}

View File

@ -58,6 +58,9 @@ private:
Block getBlockWithVirtualColumns(const StorageListWithLocks & selected_tables) const;
template <typename F>
StoragePtr getFirstTable(F && predicate) const;
protected:
StorageMerge(
const std::string & name_,

View File

@ -479,11 +479,32 @@ bool StorageMergeTree::optimize(
partition_id = data.getPartitionIDFromQuery(partition, context);
String disable_reason;
if (!merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate, &disable_reason))
if (!partition && final)
{
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
std::unordered_set<String> partition_ids;
for (const MergeTreeData::DataPartPtr & part : data_parts)
partition_ids.emplace(part->info.partition_id);
for (const String & partition_id : partition_ids)
{
if (!merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, true, deduplicate, &disable_reason))
{
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
}
}
}
else
{
if (!merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate, &disable_reason))
{
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
}
}
return true;

View File

@ -106,6 +106,7 @@ namespace ErrorCodes
extern const int INCORRECT_FILE_NAME;
extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int KEEPER_EXCEPTION;
extern const int BAD_ARGUMENTS;
}
namespace ActionLocks
@ -2062,7 +2063,10 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
{
restarting_thread->wakeup();
return;
}
queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
@ -2279,14 +2283,20 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
bool deduplicate,
ReplicatedMergeTreeLogEntryData * out_log_entry)
{
bool all_in_zk = true;
std::vector<std::future<zkutil::ExistsResponse>> exists_futures;
exists_futures.reserve(parts.size());
for (const auto & part : parts)
exists_futures.emplace_back(zookeeper->asyncExists(replica_path + "/parts/" + part->name));
bool all_in_zk = true;
for (size_t i = 0; i < parts.size(); ++i)
{
/// If there is no information about part in ZK, we will not merge it.
if (!zookeeper->exists(replica_path + "/parts/" + part->name))
if (exists_futures[i].get().error == ZooKeeperImpl::ZooKeeper::ZNONODE)
{
all_in_zk = false;
const auto & part = parts[i];
if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
{
LOG_WARNING(log, "Part " << part->name << " (that was selected for merge)"
@ -2297,6 +2307,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
}
}
}
if (!all_in_zk)
return false;
@ -2313,16 +2324,6 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
String path_created = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
const String & partition_id = parts[0]->info.partition_id;
for (size_t i = 0; i + 1 < parts.size(); ++i)
{
/// Remove the unnecessary entries about non-existent blocks.
for (Int64 number = parts[i]->info.max_block + 1; number <= parts[i + 1]->info.min_block - 1; ++number)
{
zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + partition_id + "/block-" + padIndex(number));
}
}
if (out_log_entry)
*out_log_entry = entry;
@ -2914,6 +2915,9 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
if (!partition)
{
if (final)
throw Exception("FINAL flag for OPTIMIZE query on Replicated table is meaningful only with specified PARTITION", ErrorCodes::BAD_ARGUMENTS);
selected = merger_mutator.selectPartsToMerge(
future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
}
@ -3404,7 +3408,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
}
std::optional<AbandonableLockInZooKeeper>
std::optional<EphemeralLockInZooKeeper>
StorageReplicatedMergeTree::allocateBlockNumber(
const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path)
{
@ -3434,11 +3438,11 @@ StorageReplicatedMergeTree::allocateBlockNumber(
zkutil::KeeperMultiException::check(code, ops, responses);
}
AbandonableLockInZooKeeper lock;
EphemeralLockInZooKeeper lock;
/// 2 RTT
try
{
lock = AbandonableLockInZooKeeper(
lock = EphemeralLockInZooKeeper(
partition_path + "/block-", zookeeper_path + "/temp", *zookeeper, &deduplication_check_ops);
}
catch (const zkutil::KeeperMultiException & e)
@ -4375,7 +4379,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
MergeTreeData::MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
std::vector<AbandonableLockInZooKeeper> abandonable_locks;
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts");
@ -4431,7 +4435,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
abandonable_locks.emplace_back(std::move(*lock));
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
@ -4472,7 +4476,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
for (size_t i = 0; i < dst_parts.size(); ++i)
{
getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
abandonable_locks[i].getUnlockOps(ops);
ephemeral_locks[i].getUnlockOps(ops);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
@ -4513,7 +4517,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
String log_znode_path = dynamic_cast<const zkutil::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : abandonable_locks)
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
/// Forcibly remove replaced parts from ZooKeeper

View File

@ -14,7 +14,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAlterThread.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
@ -460,8 +460,9 @@ private:
void updateQuorum(const String & part_name);
/// Creates new block number if block with such block_id does not exist
std::optional<AbandonableLockInZooKeeper> allocateBlockNumber(const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "");
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "");
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica .

View File

@ -105,7 +105,7 @@ StorageSet::StorageSet(
const String & name_,
const ColumnsDescription & columns_)
: StorageSetOrJoinBase{path_, name_, columns_},
set(std::make_shared<Set>(SizeLimits()))
set(std::make_shared<Set>(SizeLimits(), false))
{
Block header = getSampleBlock();
header = header.sortColumns();
@ -115,7 +115,7 @@ StorageSet::StorageSet(
}
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block, /*fill_set_elements=*/false); }
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block); }
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
@ -129,7 +129,7 @@ void StorageSet::truncate(const ASTPtr &)
header = header.sortColumns();
increment = 0;
set = std::make_shared<Set>(SizeLimits());
set = std::make_shared<Set>(SizeLimits(), false);
set->setHeader(header);
};

View File

@ -245,6 +245,10 @@ void TinyLogBlockOutputStream::writeSuffix()
return;
done = true;
/// If nothing was written - leave the table in initial state.
if (streams.empty())
return;
/// Finish write.
for (auto & stream : streams)
stream.second->finalize();

View File

@ -22,95 +22,6 @@ namespace ErrorCodes
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
}
/// Some virtual columns routines
namespace
{
bool hasColumn(const ColumnsWithTypeAndName & columns, const String & column_name)
{
for (const auto & column : columns)
{
if (column.name == column_name)
return true;
}
return false;
}
NameAndTypePair tryGetColumn(const ColumnsWithTypeAndName & columns, const String & column_name)
{
for (const auto & column : columns)
{
if (column.name == column_name)
return {column.name, column.type};
}
return {};
}
struct VirtualColumnsProcessor
{
explicit VirtualColumnsProcessor(const ColumnsWithTypeAndName & all_virtual_columns_)
: all_virtual_columns(all_virtual_columns_), virtual_columns_mask(all_virtual_columns_.size(), 0) {}
/// Separates real and virtual column names, returns real ones
Names process(const Names & column_names, const std::vector<bool *> & virtual_columns_exists_flag = {})
{
Names real_column_names;
if (!virtual_columns_exists_flag.empty())
{
for (size_t i = 0; i < all_virtual_columns.size(); ++i)
*virtual_columns_exists_flag.at(i) = false;
}
for (const String & column_name : column_names)
{
ssize_t virtual_column_index = -1;
for (size_t i = 0; i < all_virtual_columns.size(); ++i)
{
if (column_name == all_virtual_columns[i].name)
{
virtual_column_index = i;
break;
}
}
if (virtual_column_index >= 0)
{
auto index = static_cast<size_t>(virtual_column_index);
virtual_columns_mask[index] = 1;
if (!virtual_columns_exists_flag.empty())
*virtual_columns_exists_flag.at(index) = true;
}
else
{
real_column_names.emplace_back(column_name);
}
}
return real_column_names;
}
void appendVirtualColumns(Block & block)
{
for (size_t i = 0; i < all_virtual_columns.size(); ++i)
{
if (virtual_columns_mask[i])
block.insert(all_virtual_columns[i].cloneEmpty());
}
}
protected:
const ColumnsWithTypeAndName & all_virtual_columns;
std::vector<UInt8> virtual_columns_mask;
};
}
StorageSystemTables::StorageSystemTables(const std::string & name_)
: name(name_)
@ -123,14 +34,10 @@ StorageSystemTables::StorageSystemTables(const std::string & name_)
{"is_temporary", std::make_shared<DataTypeUInt8>()},
{"data_path", std::make_shared<DataTypeString>()},
{"metadata_path", std::make_shared<DataTypeString>()},
{"metadata_modification_time", std::make_shared<DataTypeDateTime>()},
{"create_table_query", std::make_shared<DataTypeString>()},
{"engine_full", std::make_shared<DataTypeString>()}
}));
virtual_columns =
{
{std::make_shared<DataTypeDateTime>(), "metadata_modification_time"},
{std::make_shared<DataTypeString>(), "create_table_query"},
{std::make_shared<DataTypeString>(), "engine_full"}
};
}
@ -156,17 +63,24 @@ BlockInputStreams StorageSystemTables::read(
{
processed_stage = QueryProcessingStage::FetchColumns;
Names real_column_names;
bool has_metadata_modification_time = false;
bool has_create_table_query = false;
bool has_engine_full = false;
check(column_names);
VirtualColumnsProcessor virtual_columns_processor(virtual_columns);
real_column_names = virtual_columns_processor.process(column_names, {&has_metadata_modification_time, &has_create_table_query, &has_engine_full});
check(real_column_names);
/// Create a mask of what columns are needed in the result.
Block res_block = getSampleBlock();
virtual_columns_processor.appendVirtualColumns(res_block);
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = getSampleBlock();
Block res_block;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.count(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
res_block.insert(sample_block.getByPosition(i));
}
}
MutableColumns res_columns = res_block.cloneEmptyColumns();
@ -188,25 +102,38 @@ BlockInputStreams StorageSystemTables::read(
{
auto table_name = iterator->name();
size_t j = 0;
res_columns[j++]->insert(database_name);
res_columns[j++]->insert(table_name);
res_columns[j++]->insert(iterator->table()->getName());
res_columns[j++]->insert(UInt64(0));
res_columns[j++]->insert(iterator->table()->getDataPath());
res_columns[j++]->insert(database->getTableMetadataPath(table_name));
size_t src_index = 0;
size_t res_index = 0;
if (has_metadata_modification_time)
res_columns[j++]->insert(static_cast<UInt64>(database->getTableMetadataModificationTime(context, table_name)));
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database_name);
if (has_create_table_query || has_engine_full)
if (columns_mask[src_index++])
res_columns[res_index++]->insert(table_name);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(iterator->table()->getName());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(UInt64(0));
if (columns_mask[src_index++])
res_columns[res_index++]->insert(iterator->table()->getDataPath());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getTableMetadataPath(table_name));
if (columns_mask[src_index++])
res_columns[res_index++]->insert(static_cast<UInt64>(database->getTableMetadataModificationTime(context, table_name)));
if (columns_mask[src_index] || columns_mask[src_index + 1])
{
ASTPtr ast = database->tryGetCreateTableQuery(context, table_name);
if (has_create_table_query)
res_columns[j++]->insert(ast ? queryToString(ast) : "");
if (columns_mask[src_index++])
res_columns[res_index++]->insert(ast ? queryToString(ast) : "");
if (has_engine_full)
if (columns_mask[src_index++])
{
String engine_full;
@ -223,34 +150,49 @@ BlockInputStreams StorageSystemTables::read(
}
}
res_columns[j++]->insert(engine_full);
res_columns[res_index++]->insert(engine_full);
}
}
}
}
/// This is for temporary tables.
if (context.hasSessionContext())
{
Tables external_tables = context.getSessionContext().getExternalTables();
for (auto table : external_tables)
{
size_t j = 0;
res_columns[j++]->insertDefault();
res_columns[j++]->insert(table.first);
res_columns[j++]->insert(table.second->getName());
res_columns[j++]->insert(UInt64(1));
res_columns[j++]->insertDefault();
res_columns[j++]->insertDefault();
size_t src_index = 0;
size_t res_index = 0;
if (has_metadata_modification_time)
res_columns[j++]->insertDefault();
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
if (has_create_table_query)
res_columns[j++]->insertDefault();
if (columns_mask[src_index++])
res_columns[res_index++]->insert(table.first);
if (has_engine_full)
res_columns[j++]->insert(table.second->getName());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(table.second->getName());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(UInt64(1));
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
if (columns_mask[src_index++])
res_columns[res_index++]->insert(table.second->getName());
}
}
@ -258,15 +200,4 @@ BlockInputStreams StorageSystemTables::read(
return {std::make_shared<OneBlockInputStream>(res_block)};
}
bool StorageSystemTables::hasColumn(const String & column_name) const
{
return DB::hasColumn(virtual_columns, column_name) || ITableDeclaration::hasColumn(column_name);
}
NameAndTypePair StorageSystemTables::getColumn(const String & column_name) const
{
auto virtual_column = DB::tryGetColumn(virtual_columns, column_name);
return !virtual_column.name.empty() ? virtual_column : ITableDeclaration::getColumn(column_name);
}
}

View File

@ -26,15 +26,9 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool hasColumn(const String & column_name) const override;
NameAndTypePair getColumn(const String & column_name) const override;
private:
const std::string name;
ColumnsWithTypeAndName virtual_columns;
protected:
StorageSystemTables(const std::string & name_);
};

View File

@ -1,37 +0,0 @@
#include <Storages/VirtualColumnFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
DataTypePtr VirtualColumnFactory::getType(const String & name)
{
auto res = tryGetType(name);
if (!res)
throw Exception("There is no column " + name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
return res;
}
bool VirtualColumnFactory::hasColumn(const String & name)
{
return !!tryGetType(name);
}
DataTypePtr VirtualColumnFactory::tryGetType(const String & name)
{
if (name == "_table") return std::make_shared<DataTypeString>();
if (name == "_part") return std::make_shared<DataTypeString>();
if (name == "_part_index") return std::make_shared<DataTypeUInt64>();
if (name == "_sample_factor") return std::make_shared<DataTypeFloat64>();
return nullptr;
}
}

View File

@ -1,20 +0,0 @@
#pragma once
#include <DataTypes/IDataType.h>
namespace DB
{
/** Knows the names and types of all possible virtual columns.
* It is necessary for engines that redirect a request to other tables without knowing in advance what virtual columns they contain.
*/
class VirtualColumnFactory
{
public:
static bool hasColumn(const String & name);
static DataTypePtr getType(const String & name);
static DataTypePtr tryGetType(const String & name);
};
}

View File

@ -4,7 +4,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <ext/scope_guard.h>
#include <pcg_random.hpp>

View File

@ -4,7 +4,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <ext/scope_guard.h>
#include <pcg_random.hpp>
@ -37,17 +37,17 @@ try
Stopwatch total;
Stopwatch stage;
/// Load current inserts
std::unordered_set<String> abandonable_lock_holders;
std::unordered_set<String> lock_holder_paths;
for (const String & entry : zookeeper->getChildren(zookeeper_path + "/temp"))
{
if (startsWith(entry, "abandonable_lock-"))
abandonable_lock_holders.insert(zookeeper_path + "/temp/" + entry);
lock_holder_paths.insert(zookeeper_path + "/temp/" + entry);
}
std::cerr << "Stage 1 (get lock holders): " << abandonable_lock_holders.size()
std::cerr << "Stage 1 (get lock holders): " << lock_holder_paths.size()
<< " lock holders, elapsed: " << stage.elapsedSeconds() << "s." << std::endl;
stage.restart();
if (!abandonable_lock_holders.empty())
if (!lock_holder_paths.empty())
{
Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers");
std::cerr << "Stage 2 (get partitions): " << partitions.size()
@ -86,7 +86,7 @@ try
for (BlockInfo & block : block_infos)
{
zkutil::GetResponse resp = block.contents_future.get();
if (!resp.error && abandonable_lock_holders.count(resp.data))
if (!resp.error && lock_holder_paths.count(resp.data))
{
++total_count;
current_inserts[block.partition].insert(block.number);

View File

@ -26,8 +26,7 @@ try
names_and_types.emplace_back("a", std::make_shared<DataTypeUInt64>());
names_and_types.emplace_back("b", std::make_shared<DataTypeUInt8>());
StoragePtr table = StorageLog::create(
"./", "test", ColumnsDescription{names_and_types}, DEFAULT_MAX_COMPRESS_BLOCK_SIZE);
StoragePtr table = StorageLog::create("./", "test", ColumnsDescription{names_and_types}, 1048576);
table->startup();
/// write into it

View File

@ -1,6 +1,5 @@
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/ITableFunctionFileLike.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Common/Exception.h>

View File

@ -134,15 +134,19 @@ def test_insert_multithreaded(started_cluster):
# Sanity check: at least something was inserted
assert runner.total_inserted > 0
for i in range(30): # wait for replication 3 seconds max
all_replicated = False
for i in range(50): # wait for replication 5 seconds max
time.sleep(0.1)
def get_delay(node):
return int(node.query("SELECT absolute_delay FROM system.replicas WHERE table = 'repl_test'").rstrip())
if all([get_delay(n) == 0 for n in nodes]):
all_replicated = True
break
assert all_replicated
actual_inserted = []
for i, node in enumerate(nodes):
actual_inserted.append(int(node.query("SELECT sum(x) FROM repl_test").rstrip()))

View File

@ -0,0 +1,14 @@
1 6 3 3
1 6 3 3
1 6 [3,2]
1 6 [3,2]
1 0.5
1 0.5
1 0.1
1 0.1
0 333333 53
1 333333 53
2 333333 53
0 333333 53
1 333333 53
2 333333 53

View File

@ -0,0 +1,115 @@
drop table if exists test.summing_merge_tree_aggregate_function;
drop table if exists test.summing_merge_tree_null;
---- sum + uniq + uniqExact
create table test.summing_merge_tree_aggregate_function (
d materialized today(),
k UInt64,
c UInt64,
u AggregateFunction(uniq, UInt8),
ue AggregateFunction(uniqExact, UInt8)
) engine=SummingMergeTree(d, k, 8192);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(1), uniqExactState(1);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(2), uniqExactState(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(3), uniqExactState(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(1), uniqExactState(1);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(2), uniqExactState(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(3), uniqExactState(3);
select
k, sum(c),
uniqMerge(u), uniqExactMerge(ue)
from test.summing_merge_tree_aggregate_function group by k;
optimize table test.summing_merge_tree_aggregate_function;
select
k, sum(c),
uniqMerge(u), uniqExactMerge(ue)
from test.summing_merge_tree_aggregate_function group by k;
drop table test.summing_merge_tree_aggregate_function;
---- sum + topK
create table test.summing_merge_tree_aggregate_function (d materialized today(), k UInt64, c UInt64, x AggregateFunction(topK(2), UInt8)) engine=SummingMergeTree(d, k, 8192);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(1);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(3);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(3);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(3);
select k, sum(c), topKMerge(2)(x) from test.summing_merge_tree_aggregate_function group by k;
optimize table test.summing_merge_tree_aggregate_function;
select k, sum(c), topKMerge(2)(x) from test.summing_merge_tree_aggregate_function group by k;
drop table test.summing_merge_tree_aggregate_function;
---- avg
create table test.summing_merge_tree_aggregate_function (d materialized today(), k UInt64, x AggregateFunction(avg, Float64)) engine=SummingMergeTree(d, k, 8192);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.0);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.1);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.2);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.3);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.4);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.5);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.6);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.7);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.8);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.9);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(1.0);
select k, avgMerge(x) from test.summing_merge_tree_aggregate_function group by k;
optimize table test.summing_merge_tree_aggregate_function;
select k, avgMerge(x) from test.summing_merge_tree_aggregate_function group by k;
drop table test.summing_merge_tree_aggregate_function;
---- quantile
create table test.summing_merge_tree_aggregate_function (d materialized today(), k UInt64, x AggregateFunction(quantile(0.1), Float64)) engine=SummingMergeTree(d, k, 8192);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.0);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.1);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.2);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.3);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.4);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.5);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.6);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.7);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.8);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.9);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(1.0);
select k, quantileMerge(0.1)(x) from test.summing_merge_tree_aggregate_function group by k;
optimize table test.summing_merge_tree_aggregate_function;
select k, quantileMerge(0.1)(x) from test.summing_merge_tree_aggregate_function group by k;
drop table test.summing_merge_tree_aggregate_function;
---- sum + uniq with more data
create table test.summing_merge_tree_null (
d materialized today(),
k UInt64,
c UInt64,
u UInt64
) engine=Null;
create materialized view test.summing_merge_tree_aggregate_function (
d materialized today(),
k UInt64,
c UInt64,
u AggregateFunction(uniq, UInt64)
) engine=SummingMergeTree(d, k, 8192)
as select d, k, sum(c) as c, uniqState(u) as u
from test.summing_merge_tree_null
group by d, k;
-- prime number 53 to avoid resonanse between %3 and %53
insert into test.summing_merge_tree_null select number % 3, 1, number % 53 from numbers(999999);
select k, sum(c), uniqMerge(u) from test.summing_merge_tree_aggregate_function group by k order by k;
optimize table test.summing_merge_tree_aggregate_function;
select k, sum(c), uniqMerge(u) from test.summing_merge_tree_aggregate_function group by k order by k;
drop table test.summing_merge_tree_aggregate_function;
drop table test.summing_merge_tree_null;

View File

@ -252,24 +252,24 @@ ghij
1
0
1
0
0
1
0
0
1
0
1
1
0
1
0
1
1
1
0
0
0
1
0
1
0
0
0
1
1
1
1
@ -284,13 +284,13 @@ ghij
0
1
1
0
0
0
1
0
0
0
1
1
1
1
1
1
1
----- Aggregation -----
A 0 2

View File

@ -14,9 +14,6 @@ SELECT * FROM test.merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.nu
SELECT * FROM test.merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id;
SELECT * FROM test.merge_tree_in_subqueries WHERE name IN (SELECT 'test' || toString(number) FROM system.numbers LIMIT 2, 3) ORDER BY id;
/* This should be removed when the issue of using the index for tuples in the IN operator is addressed. */
SET force_primary_key = 0;
SELECT id AS id2, name AS value FROM test.merge_tree_in_subqueries WHERE (value, id2) IN (SELECT 'test' || toString(number), number FROM system.numbers LIMIT 2, 3) ORDER BY id;
-- Non-index scans.

View File

View File

@ -4,7 +4,6 @@ set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
${CLICKHOUSE_CURL} ${CLICKHOUSE_URL}?query="select" -X POST -F "query= 1;" 2>/dev/null
${CLICKHOUSE_CURL} "${CLICKHOUSE_URL}?query=select" -X POST --form-string 'query= 1;' 2>/dev/null
echo -ne '1,Hello\n2,World\n' | ${CLICKHOUSE_CURL} -sSF 'file=@-' "${CLICKHOUSE_URL}?file_format=CSV&file_types=UInt8,String&query=SELE" -X POST -F "query=CT * FROM file" 2>/dev/null
echo -ne '1,Hello\n2,World\n' | ${CLICKHOUSE_CURL} -sS -F 'file=@-' "${CLICKHOUSE_URL}?file_format=CSV&file_types=UInt8,String&query=SELE" -X POST --form-string 'query=CT * FROM file' 2>/dev/null

View File

@ -0,0 +1,8 @@
\'single quote\' not end 123 2016-01-01
\'em good 456 2016-01-02
\'single quote\' not end 123 2016-01-01
\'em good 456 2016-01-02
"double quote" not end 123 2016-01-01
"em good 456 2016-01-02
"double quote" not end 123 2016-01-01
"em good 456 2016-01-02

View File

@ -0,0 +1,44 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.csv";
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.csv (s String, n UInt64, d Date) ENGINE = Memory";
echo "'single quote' not end, 123, 2016-01-01
'em good, 456, 2016-01-02" | $CLICKHOUSE_CLIENT --format_csv_allow_single_quotes=0 --query="INSERT INTO test.csv FORMAT CSV";
$CLICKHOUSE_CLIENT --query="SELECT * FROM test.csv ORDER BY d";
$CLICKHOUSE_CLIENT --query="DROP TABLE test.csv";
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.csv (s String, n UInt64, d Date) ENGINE = Memory";
echo "'single quote' not end, 123, 2016-01-01
'em good, 456, 2016-01-02" | $CLICKHOUSE_CLIENT --multiquery --query="SET format_csv_allow_single_quotes=0; INSERT INTO test.csv FORMAT CSV";
$CLICKHOUSE_CLIENT --query="SELECT * FROM test.csv ORDER BY d";
$CLICKHOUSE_CLIENT --query="DROP TABLE test.csv";
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.csv";
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.csv (s String, n UInt64, d Date) ENGINE = Memory";
echo '"double quote" not end, 123, 2016-01-01
"em good, 456, 2016-01-02' | $CLICKHOUSE_CLIENT --format_csv_allow_double_quotes=0 --query="INSERT INTO test.csv FORMAT CSV";
$CLICKHOUSE_CLIENT --query="SELECT * FROM test.csv ORDER BY d";
$CLICKHOUSE_CLIENT --query="DROP TABLE test.csv";
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.csv (s String, n UInt64, d Date) ENGINE = Memory";
echo '"double quote" not end, 123, 2016-01-01
"em good, 456, 2016-01-02' | $CLICKHOUSE_CLIENT --multiquery --query="SET format_csv_allow_double_quotes=0; INSERT INTO test.csv FORMAT CSV";
$CLICKHOUSE_CLIENT --query="SELECT * FROM test.csv ORDER BY d";
$CLICKHOUSE_CLIENT --query="DROP TABLE test.csv";

View File

@ -1,3 +1,5 @@
Query should fail 1
Query should fail 2
2000-01-01 2 b
2000-01-01 5 e
2000-02-01 2 b

View File

@ -13,17 +13,22 @@ ${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.mutations_r2(d Date, x UInt32, s
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE x = 1"
# Insert some data
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1 VALUES ('2000-01-01', 1, 'a')"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1 VALUES \
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1(d, x, s) VALUES \
('2000-01-01', 1, 'a')"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1(d, x, s) VALUES \
('2000-01-01', 2, 'b'), ('2000-01-01', 3, 'c'), ('2000-01-01', 4, 'd') \
('2000-02-01', 2, 'b'), ('2000-02-01', 3, 'c'), ('2000-02-01', 4, 'd')"
# Try some malformed queries that should fail validation.
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE nonexistent = 0" 2>/dev/null || echo "Query should fail 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE d = '11'" 2>/dev/null || echo "Query should fail 2"
# Delete some values
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE x % 2 = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE s = 'd'"
# Insert more data
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1 VALUES \
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1(d, x, s) VALUES \
('2000-01-01', 5, 'e'), ('2000-02-01', 5, 'e')"
# Wait until all mutations are done.
@ -36,7 +41,6 @@ do
if [[ $i -eq 100 ]]; then
echo "Timed out while waiting for mutations to execute!"
exit 1
fi
done

View File

@ -0,0 +1,5 @@
drop table if exists test.table;
create table test.table (val Int32) engine = MergeTree order by val;
insert into test.table values (-2), (0), (2);
select count() from test.table where toUInt64(val) == 0;

View File

@ -0,0 +1,21 @@
0
1
4
5
-
0
\N
1
\N
7
-
\N
0
-
\N
\N
0
2
\N
\N
2

View File

@ -0,0 +1,8 @@
select runningDifference(x) from (select arrayJoin([0, 1, 5, 10]) as x);
select '-';
select runningDifference(x) from (select arrayJoin([2, Null, 3, Null, 10]) as x);
select '-';
select runningDifference(x) from (select arrayJoin([Null, 1]) as x);
select '-';
select runningDifference(x) from (select arrayJoin([Null, Null, 1, 3, Null, Null, 5]) as x);

View File

@ -0,0 +1,7 @@
2000-01-01 1 first 1
2000-01-01 1 first 2
2000-01-01 2 first 2
2000-01-02 1 first 3
2000-01-01 1 first 3
2000-01-01 2 first 2
2000-01-02 1 first 3

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS test.partitioned_by_tuple;
CREATE TABLE test.partitioned_by_tuple (d Date, x UInt8, w String, y UInt8) ENGINE SummingMergeTree (y) PARTITION BY (d, x) ORDER BY (d, x, w);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-02', 1, 'first', 3);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-01', 2, 'first', 2);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-01', 1, 'first', 1), ('2000-01-01', 1, 'first', 2);
OPTIMIZE TABLE test.partitioned_by_tuple;
SELECT * FROM test.partitioned_by_tuple ORDER BY d, x, w, y;
OPTIMIZE TABLE test.partitioned_by_tuple FINAL;
SELECT * FROM test.partitioned_by_tuple ORDER BY d, x, w, y;
DROP TABLE test.partitioned_by_tuple;

View File

@ -0,0 +1,5 @@
[NULL,'str1','str2'] 1
['str1','str2'] 1
[] 0
[] 0
1

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS test.has_function;
CREATE TABLE test.has_function(arr Array(Nullable(String))) ENGINE = Memory;
INSERT INTO test.has_function(arr) values ([null, 'str1', 'str2']),(['str1', 'str2']), ([]), ([]);
SELECT arr, has(`arr`, 'str1') FROM test.has_function;
SELECT has([null, 'str1', 'str2'], 'str1');
DROP TABLE test.has_function;

View File

@ -0,0 +1,56 @@
1
1
0
0
1
1
0
1
1
1
0
0
1
1
0
1
1
1
0
1
1
0
1
1
0
1
1
0
1
1
0
0
1
1
0
1
1
1
0
0
1
1
0
1
1
1
0
1
1
0
1
1
0
1
1
0

View File

@ -0,0 +1,72 @@
SELECT has(['a', 'b'], 'a');
SELECT has(['a', 'b'], 'b');
SELECT has(['a', 'b'], 'c');
SELECT has(['a', 'b'], NULL);
SELECT has(['a', NULL, 'b'], 'a');
SELECT has(['a', NULL, 'b'], 'b');
SELECT has(['a', NULL, 'b'], 'c');
SELECT has(['a', NULL, 'b'], NULL);
SELECT has(materialize(['a', 'b']), 'a');
SELECT has(materialize(['a', 'b']), 'b');
SELECT has(materialize(['a', 'b']), 'c');
SELECT has(materialize(['a', 'b']), NULL);
SELECT has(materialize(['a', NULL, 'b']), 'a');
SELECT has(materialize(['a', NULL, 'b']), 'b');
SELECT has(materialize(['a', NULL, 'b']), 'c');
SELECT has(materialize(['a', NULL, 'b']), NULL);
SELECT has(['a', 'b'], materialize('a'));
SELECT has(['a', 'b'], materialize('b'));
SELECT has(['a', 'b'], materialize('c'));
SELECT has(['a', NULL, 'b'], materialize('a'));
SELECT has(['a', NULL, 'b'], materialize('b'));
SELECT has(['a', NULL, 'b'], materialize('c'));
SELECT has(materialize(['a', 'b']), materialize('a'));
SELECT has(materialize(['a', 'b']), materialize('b'));
SELECT has(materialize(['a', 'b']), materialize('c'));
SELECT has(materialize(['a', NULL, 'b']), materialize('a'));
SELECT has(materialize(['a', NULL, 'b']), materialize('b'));
SELECT has(materialize(['a', NULL, 'b']), materialize('c'));
SELECT has([111, 222], 111);
SELECT has([111, 222], 222);
SELECT has([111, 222], 333);
SELECT has([111, 222], NULL);
SELECT has([111, NULL, 222], 111);
SELECT has([111, NULL, 222], 222);
SELECT has([111, NULL, 222], 333);
SELECT has([111, NULL, 222], NULL);
SELECT has(materialize([111, 222]), 111);
SELECT has(materialize([111, 222]), 222);
SELECT has(materialize([111, 222]), 333);
SELECT has(materialize([111, 222]), NULL);
SELECT has(materialize([111, NULL, 222]), 111);
SELECT has(materialize([111, NULL, 222]), 222);
SELECT has(materialize([111, NULL, 222]), 333);
SELECT has(materialize([111, NULL, 222]), NULL);
SELECT has([111, 222], materialize(111));
SELECT has([111, 222], materialize(222));
SELECT has([111, 222], materialize(333));
SELECT has([111, NULL, 222], materialize(111));
SELECT has([111, NULL, 222], materialize(222));
SELECT has([111, NULL, 222], materialize(333));
SELECT has(materialize([111, 222]), materialize(111));
SELECT has(materialize([111, 222]), materialize(222));
SELECT has(materialize([111, 222]), materialize(333));
SELECT has(materialize([111, NULL, 222]), materialize(111));
SELECT has(materialize([111, NULL, 222]), materialize(222));
SELECT has(materialize([111, NULL, 222]), materialize(333));

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS test.empty;
DROP TABLE IF EXISTS test.data;
CREATE TABLE test.empty (value Int8) ENGINE = TinyLog;
CREATE TABLE test.data (value Int8) ENGINE = TinyLog;
INSERT INTO test.data SELECT * FROM test.empty;
SELECT * FROM test.data;
INSERT INTO test.data SELECT 1;
SELECT * FROM test.data;
DROP TABLE test.empty;
DROP TABLE test.data;

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (1.1.54388) unstable; urgency=low
clickhouse (1.1.54390) unstable; urgency=low
* Modified source code
-- <robot-metrika-test@yandex-team.ru> Wed, 27 Jun 2018 16:10:59 +0300
-- <robot-metrika-test@yandex-team.ru> Fri, 06 Jul 2018 19:37:50 +0300

View File

@ -8,10 +8,10 @@ There are libraries for working with ClickHouse for:
- [clickhouse-driver](https://github.com/mymarilyn/clickhouse-driver)
- [clickhouse-client](https://github.com/yurial/clickhouse-client)
- PHP
- [clickhouse-php-client](https://github.com/8bitov/clickhouse-php-client)
- [PhpClickHouseClient](https://github.com/SevaCode/PhpClickHouseClient)
- [phpClickHouse](https://github.com/smi2/phpClickHouse)
- [clickhouse-php-client](https://github.com/8bitov/clickhouse-php-client)
- [clickhouse-client](https://github.com/bozerkins/clickhouse-client)
- [PhpClickHouseClient](https://github.com/SevaCode/PhpClickHouseClient)
- Go
- [clickhouse](https://github.com/kshvakov/clickhouse/)
- [go-clickhouse](https://github.com/roistat/go-clickhouse)

View File

@ -40,7 +40,7 @@ SELECT uniq(UserID) FROM table
SELECT uniqMerge(state) FROM (SELECT uniqState(UserID) AS state FROM table GROUP BY RegionID)
```
There is an ` AggregatingMergeTree` engine. Its job during a merge is to combine the states of aggregate functions from different table rows with the same primary key value.
There is an `AggregatingMergeTree` engine. Its job during a merge is to combine the states of aggregate functions from different table rows with the same primary key value.
You can't use a normal INSERT to insert a row in a table containing `AggregateFunction` columns, because you can't explicitly define the `AggregateFunction` value. Instead, use `INSERT SELECT` with `-State` aggregate functions for inserting data.

View File

@ -16,7 +16,7 @@ The columns to total are set explicitly (the last parameter Shows, Clicks, C
If the values were null in all of these columns, the row is deleted. (The exception is cases when the data part would not have any rows left in it.)
For the other rows that are not part of the primary key, the first value that occurs is selected when merging.
For the other columns that are not part of the primary key, the first value that occurs is selected when merging. But if a column is of AggregateFunction type, then it is merged according to that function, which effectively makes this engine behave like `AggregatingMergeTree`.
Summation is not performed for a read operation. If it is necessary, write the appropriate GROUP BY.

View File

@ -8,10 +8,10 @@
- [clickhouse-driver](https://github.com/mymarilyn/clickhouse-driver)
- [clickhouse-client](https://github.com/yurial/clickhouse-client)
- PHP
- [clickhouse-php-client](https://github.com/8bitov/clickhouse-php-client)
- [PhpClickHouseClient](https://github.com/SevaCode/PhpClickHouseClient)
- [phpClickHouse](https://github.com/smi2/phpClickHouse)
- [clickhouse-php-client](https://github.com/8bitov/clickhouse-php-client)
- [clickhouse-client](https://github.com/bozerkins/clickhouse-client)
- [PhpClickHouseClient](https://github.com/SevaCode/PhpClickHouseClient)
- Go
- [clickhouse](https://github.com/kshvakov/clickhouse/)
- [go-clickhouse](https://github.com/roistat/go-clickhouse)

View File

@ -16,7 +16,7 @@ SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, ...), 8192, (Shows, C
Если значения во всех таких столбцах оказались нулевыми, то строчка удаляется. (За исключением случаев, когда в куске данных не осталось бы ни одной строчки.)
Для остальных столбцов, не входящих в первичный ключ, при слиянии выбирается первое попавшееся значение.
Для остальных столбцов, не входящих в первичный ключ, при слиянии выбирается первое попавшееся значение. Но для столбцов типа AggregateFunction выполняется агрегация согласно заданной функции, так что этот движок фактически ведёт себя как `AggregatingMergeTree`.
При чтении, суммирование не делается само по себе. Если оно необходимо - напишите соответствующий GROUP BY.

View File

@ -6,16 +6,17 @@
#include <boost/iterator/counting_iterator.hpp>
/** \brief Numeric range iterator, used to represent a half-closed interval [begin, end).
* In conjunction with std::reverse_iterator allows for forward and backward iteration
* over corresponding interval. */
/** Numeric range iterator, used to represent a half-closed interval [begin, end).
* In conjunction with std::reverse_iterator allows for forward and backward iteration
* over corresponding interval.
*/
namespace ext
{
template <typename T>
using range_iterator = boost::counting_iterator<T>;
/** \brief Range-based for loop adapter for (reverse_)range_iterator.
* By and large should be in conjunction with ext::range and ext::reverse_range.
/** Range-based for loop adapter for (reverse_)range_iterator.
* By and large should be in conjunction with ext::range and ext::reverse_range.
*/
template <typename T>
struct range_wrapper
@ -30,12 +31,12 @@ namespace ext
iterator end() const { return iterator(end_); }
};
/** \brief Constructs range_wrapper for forward-iteration over [begin, end) in range-based for loop.
* Usage example:
* for (const auto i : ext::range(0, 4)) print(i);
* Output:
* 0 1 2 3
*/
/** Constructs range_wrapper for forward-iteration over [begin, end) in range-based for loop.
* Usage example:
* for (const auto i : ext::range(0, 4)) print(i);
* Output:
* 0 1 2 3
*/
template <typename T1, typename T2>
inline range_wrapper<typename std::common_type<T1, T2>::type> range(T1 begin, T2 end)
{