Merge branch 'master' into update_keeper_config

This commit is contained in:
alesapin 2021-10-19 10:16:57 +03:00
commit f2d266acce
106 changed files with 1175 additions and 673 deletions

View File

@ -320,7 +320,7 @@ SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
- `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)`
Stores a [Bloom filter](https://en.wikipedia.org/wiki/Bloom_filter) that contains all ngrams from a block of data. Works only with strings. Can be used for optimization of `equals`, `like` and `in` expressions.
Stores a [Bloom filter](https://en.wikipedia.org/wiki/Bloom_filter) that contains all ngrams from a block of data. Works only with datatypes: [String](../../../sql-reference/data-types/string.md), [FixedString](../../../sql-reference/data-types/fixedstring.md) and [Map](../../../sql-reference/data-types/map.md). Can be used for optimization of `EQUALS`, `LIKE` and `IN` expressions.
- `n` — ngram size,
- `size_of_bloom_filter_in_bytes` — Bloom filter size in bytes (you can use large values here, for example, 256 or 512, because it can be compressed well).
@ -337,7 +337,9 @@ SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
Supported data types: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`, `Array`, `LowCardinality`, `Nullable`, `UUID`, `Map`.
For `Map` data type client can specify if index should be created for keys or values using [mapKeys](../../../sql-reference/functions/tuple-map-functions.md#mapkeys) or [mapValues](../../../sql-reference/functions/tuple-map-functions.md#mapvalues) function.
For `Map` data type client can specify if index should be created for keys or values using [mapKeys](../../../sql-reference/functions/tuple-map-functions.md#mapkeys) or [mapValues](../../../sql-reference/functions/tuple-map-functions.md#mapvalues) function.
The following functions can use the filter: [equals](../../../sql-reference/functions/comparison-functions.md), [notEquals](../../../sql-reference/functions/comparison-functions.md), [in](../../../sql-reference/functions/in-functions.md), [notIn](../../../sql-reference/functions/in-functions.md), [has](../../../sql-reference/functions/array-functions.md#hasarr-elem).
Example of index creation for `Map` data type
@ -346,9 +348,6 @@ INDEX map_key_index mapKeys(map_column) TYPE bloom_filter GRANULARITY 1
INDEX map_key_index mapValues(map_column) TYPE bloom_filter GRANULARITY 1
```
The following functions can use it: [equals](../../../sql-reference/functions/comparison-functions.md), [notEquals](../../../sql-reference/functions/comparison-functions.md), [in](../../../sql-reference/functions/in-functions.md), [notIn](../../../sql-reference/functions/in-functions.md), [has](../../../sql-reference/functions/array-functions.md).
<!-- -->
``` sql
INDEX sample_index (u64 * length(s)) TYPE minmax GRANULARITY 4

View File

@ -166,5 +166,6 @@ toc_title: Adopters
| <a href="https://beeline.ru/" class="favicon">Beeline</a> | Telecom | Data Platform | — | — | [Blog post, July 2021](https://habr.com/en/company/beeline/blog/567508/) |
| <a href="https://ecommpay.com/" class="favicon">Ecommpay</a> | Payment Processing | Logs | — | — | [Video, Nov 2019](https://www.youtube.com/watch?v=d3GdZTOWGLk) |
| <a href="https://omnicomm.ru/" class="favicon">Omnicomm</a> | Transportation Monitoring | — | — | — | [Facebook post, Oct 2021](https://www.facebook.com/OmnicommTeam/posts/2824479777774500) |
| <a href="https://ok.ru" class="favicon">Ok.ru</a> | Social Network | — | 72 servers | 810 TB compressed, 50bn rows/day, 1.5 TB/day | [SmartData conference, Oct 2021](https://assets.ctfassets.net/oxjq45e8ilak/4JPHkbJenLgZhBGGyyonFP/57472ec6987003ec4078d0941740703b/____________________ClickHouse_______________________.pdf) |
[Original article](https://clickhouse.com/docs/en/introduction/adopters/) <!--hide-->

View File

@ -316,17 +316,26 @@ SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
#### Доступные индексы {#available-types-of-indices}
- `minmax`Хранит минимум и максимум выражения (если выражение - `tuple`, то для каждого элемента `tuple`), используя их для пропуска блоков аналогично первичному ключу.
- `minmax`хранит минимум и максимум выражения (если выражение - [Tuple](../../../sql-reference/data-types/tuple.md), то для каждого элемента `Tuple`), используя их для пропуска блоков аналогично первичному ключу.
- `set(max_rows)` — Хранит уникальные значения выражения на блоке в количестве не более `max_rows` (если `max_rows = 0`, то ограничений нет), используя их для пропуска блоков, оценивая выполнимость `WHERE` выражения на хранимых данных.
- `set(max_rows)` — хранит уникальные значения выражения на блоке в количестве не более `max_rows` (если `max_rows = 0`, то ограничений нет), используя их для пропуска блоков, оценивая выполнимость `WHERE` выражения на хранимых данных.
- `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)` — хранит [фильтр Блума](https://en.wikipedia.org/wiki/Bloom_filter), содержащий все N-граммы блока данных. Работает только с данными форматов [String](../../../sql-reference/data-types/string.md), [FixedString](../../../sql-reference/data-types/fixedstring.md) и [Map](../../../sql-reference/data-types/map.md) с ключами типа `String` или `fixedString`. Может быть использован для оптимизации выражений `EQUALS`, `LIKE` и `IN`.
- `n` — размер N-граммы,
- `size_of_bloom_filter_in_bytes` — размер в байтах фильтра Блума (можно использовать большие значения, например, 256 или 512, поскольку сжатие компенсирует возможные издержки).
- `number_of_hash_functions` — количество хеш-функций, использующихся в фильтре Блума.
- `random_seed` — состояние генератора случайных чисел для хеш-функций фильтра Блума.
- `tokenbf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)` — то же, что и`ngrambf_v1`, но хранит токены вместо N-грамм. Токены — это последовательности символов, разделенные не буквенно-цифровыми символами.
- `bloom_filter([false_positive])` — [фильтр Блума](https://en.wikipedia.org/wiki/Bloom_filter) для указанных стоблцов.
Необязательный параметр `false_positive` — это вероятность получения ложноположительного срабатывания. Возможные значения: (0, 1). Значение по умолчанию: 0.025.
Поддержанные типы данных: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`.
Поддерживаемые типы данных: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`.
Фильтром могут пользоваться функции: [equals](../../../engines/table-engines/mergetree-family/mergetree.md), [notEquals](../../../engines/table-engines/mergetree-family/mergetree.md), [in](../../../engines/table-engines/mergetree-family/mergetree.md), [notIn](../../../engines/table-engines/mergetree-family/mergetree.md).
Фильтром могут пользоваться функции: [equals](../../../sql-reference/functions/comparison-functions.md), [notEquals](../../../sql-reference/functions/comparison-functions.md), [in](../../../sql-reference/functions/in-functions.md), [notIn](../../../sql-reference/functions/in-functions.md), [has](../../../sql-reference/functions/array-functions.md#hasarr-elem).
**Примеры**

View File

@ -2684,6 +2684,43 @@ SELECT CAST(toNullable(toInt32(0)) AS Int32) as x, toTypeName(x);
Значение по умолчанию: `1`.
## output_format_csv_null_representation {#output_format_csv_null_representation}
Определяет представление `NULL` для формата выходных данных [CSV](../../interfaces/formats.md#csv). Пользователь может установить в качестве значения любую строку, например, `My NULL`.
Значение по умолчанию: `\N`.
**Примеры**
Запрос:
```sql
SELECT * FROM csv_custom_null FORMAT CSV;
```
Результат:
```text
788
\N
\N
```
Запрос:
```sql
SET output_format_csv_null_representation = 'My NULL';
SELECT * FROM csv_custom_null FORMAT CSV;
```
Результат:
```text
788
My NULL
My NULL
```
## output_format_tsv_null_representation {#output_format_tsv_null_representation}
Определяет представление `NULL` для формата выходных данных [TSV](../../interfaces/formats.md#tabseparated). Пользователь может установить в качестве значения любую строку.

View File

@ -358,8 +358,8 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
auto servers = std::make_shared<std::vector<ProtocolServerAdapter>>();
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeKeeperDispatcher();
/// Initialize keeper RAFT. Do nothing if no keeper_server in config.
global_context->initializeKeeperDispatcher(/* start_async = */false);
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper

View File

@ -1000,8 +1000,19 @@ if (ThreadFuzzer::instance().isEffective())
if (config().has("keeper_server"))
{
#if USE_NURAFT
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeKeeperDispatcher();
//// If we don't have configured connection probably someone trying to use clickhouse-server instead
//// of clickhouse-keeper, so start synchronously.
bool can_initialize_keeper_async = false;
if (has_zookeeper) /// We have configured connection to some zookeeper cluster
{
/// If we cannot connect to some other node from our cluster then we have to wait our Keeper start
/// synchronously.
can_initialize_keeper_async = global_context->tryCheckClientConnectionToMyKeeperCluster();
}
/// Initialize keeper RAFT.
global_context->initializeKeeperDispatcher(can_initialize_keeper_async);
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper

View File

@ -40,13 +40,7 @@ public:
CompressionCodecPtr getDefaultCodec() const;
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const;
/// Just wrapper for previous method.
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
{
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check, allow_experimental_codecs);
}
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const;
/// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const;

View File

@ -53,7 +53,7 @@ void CompressionCodecFactory::validateCodec(
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const
const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
{
if (const auto * func = ast->as<ASTFunction>())
{
@ -100,12 +100,13 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
if (column_type)
{
CompressionCodecPtr prev_codec;
IDataType::StreamCallbackWithType callback = [&](
const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
ISerialization::StreamCallback callback = [&](const auto & substream_path)
{
assert(!substream_path.empty());
if (ISerialization::isSpecialCompressionAllowed(substream_path))
{
result_codec = getImpl(codec_family_name, codec_arguments, &substream_type);
const auto & last_type = substream_path.back().data.type;
result_codec = getImpl(codec_family_name, codec_arguments, last_type.get());
/// Case for column Tuple, which compressed with codec which depends on data type, like Delta.
/// We cannot substitute parameters for such codecs.
@ -115,8 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
}
};
ISerialization::SubstreamPath stream_path;
column_type->enumerateStreams(column_type->getDefaultSerialization(), callback, stream_path);
ISerialization::SubstreamPath path;
column_type->getDefaultSerialization()->enumerateStreams(path, callback, column_type, nullptr);
if (!result_codec)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName());

View File

@ -28,7 +28,7 @@ struct Settings;
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How many time we will until RAFT shutdown", 0) \
M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \
M(Milliseconds, startup_timeout, 180000, "How many time we will until RAFT to start", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \

View File

@ -250,7 +250,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
return true;
}
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper)
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async)
{
LOG_DEBUG(log, "Initializing storage dispatcher");
int myid = config.getInt("keeper_server.server_id");
@ -271,10 +271,16 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
server->startup();
LOG_DEBUG(log, "Server initialized, waiting for quorum");
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
updateConfiguration(config);
if (!start_async)
{
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
updateConfiguration(config);
}
else
{
LOG_INFO(log, "Starting Keeper asynchronously, server will accept connections to Keeper when it will be ready");
}
}
catch (...)
{
@ -368,7 +374,7 @@ void KeeperDispatcher::sessionCleanerTask()
try
{
/// Only leader node must check dead sessions
if (isLeader())
if (server->checkInit() && isLeader())
{
auto dead_sessions = server->getDeadSessions();

View File

@ -99,7 +99,12 @@ public:
/// Initialization from config.
/// standalone_keeper -- we are standalone keeper application (not inside clickhouse server)
void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper);
void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async);
bool checkInit() const
{
return server && server->checkInit();
}
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);

View File

@ -369,6 +369,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
void KeeperServer::waitInit()
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");

View File

@ -80,6 +80,12 @@ public:
/// Wait server initialization (see callbackFunc)
void waitInit();
/// Return true if KeeperServer initialized
bool checkInit() const
{
return initialized_flag;
}
void shutdown();
int getServerID() const { return server_id; }

View File

@ -702,7 +702,7 @@ ColumnPtr getColumnFromBlock(const Block & block, const NameAndTypePair & column
current_column = current_column->decompress();
if (column.isSubcolumn())
return column.getTypeInStorage()->getSubcolumn(column.getSubcolumnName(), *current_column);
return column.getTypeInStorage()->getSubcolumn(column.getSubcolumnName(), current_column);
return current_column;
}

View File

@ -3,8 +3,6 @@
#include <Columns/ColumnAggregateFunction.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/AlignedBuffer.h>
#include <Common/FieldVisitorToString.h>

View File

@ -1,17 +1,9 @@
#include <Columns/ColumnArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationArray.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <DataTypes/Serializations/SerializationNumber.h>
#include <Parsers/IAST.h>
@ -53,69 +45,6 @@ bool DataTypeArray::equals(const IDataType & rhs) const
return typeid(rhs) == typeid(*this) && nested->equals(*static_cast<const DataTypeArray &>(rhs).nested);
}
DataTypePtr DataTypeArray::tryGetSubcolumnType(const String & subcolumn_name) const
{
return tryGetSubcolumnTypeImpl(subcolumn_name, 0);
}
DataTypePtr DataTypeArray::tryGetSubcolumnTypeImpl(const String & subcolumn_name, size_t level) const
{
if (subcolumn_name == "size" + std::to_string(level))
return std::make_shared<DataTypeUInt64>();
DataTypePtr subcolumn;
if (const auto * nested_array = typeid_cast<const DataTypeArray *>(nested.get()))
subcolumn = nested_array->tryGetSubcolumnTypeImpl(subcolumn_name, level + 1);
else
subcolumn = nested->tryGetSubcolumnType(subcolumn_name);
if (subcolumn && subcolumn_name != MAIN_SUBCOLUMN_NAME)
subcolumn = std::make_shared<DataTypeArray>(std::move(subcolumn));
return subcolumn;
}
ColumnPtr DataTypeArray::getSubcolumn(const String & subcolumn_name, const IColumn & column) const
{
return getSubcolumnImpl(subcolumn_name, column, 0);
}
ColumnPtr DataTypeArray::getSubcolumnImpl(const String & subcolumn_name, const IColumn & column, size_t level) const
{
const auto & column_array = assert_cast<const ColumnArray &>(column);
if (subcolumn_name == "size" + std::to_string(level))
return arrayOffsetsToSizes(column_array.getOffsetsColumn());
ColumnPtr subcolumn;
if (const auto * nested_array = typeid_cast<const DataTypeArray *>(nested.get()))
subcolumn = nested_array->getSubcolumnImpl(subcolumn_name, column_array.getData(), level + 1);
else
subcolumn = nested->getSubcolumn(subcolumn_name, column_array.getData());
return ColumnArray::create(subcolumn, column_array.getOffsetsPtr());
}
SerializationPtr DataTypeArray::getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const
{
return getSubcolumnSerializationImpl(subcolumn_name, base_serialization_getter, 0);
}
SerializationPtr DataTypeArray::getSubcolumnSerializationImpl(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter, size_t level) const
{
if (subcolumn_name == "size" + std::to_string(level))
return std::make_shared<SerializationTupleElement>(base_serialization_getter(DataTypeUInt64()), subcolumn_name, false);
SerializationPtr subcolumn;
if (const auto * nested_array = typeid_cast<const DataTypeArray *>(nested.get()))
subcolumn = nested_array->getSubcolumnSerializationImpl(subcolumn_name, base_serialization_getter, level + 1);
else
subcolumn = nested->getSubcolumnSerialization(subcolumn_name, base_serialization_getter);
return std::make_shared<SerializationArray>(subcolumn);
}
SerializationPtr DataTypeArray::doGetDefaultSerialization() const
{
return std::make_shared<SerializationArray>(nested->getDefaultSerialization());

View File

@ -54,23 +54,12 @@ public:
return nested->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion();
}
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const override;
ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const override;
SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const override;
SerializationPtr doGetDefaultSerialization() const override;
const DataTypePtr & getNestedType() const { return nested; }
/// 1 for plain array, 2 for array of arrays and so on.
size_t getNumberOfDimensions() const;
private:
ColumnPtr getSubcolumnImpl(const String & subcolumn_name, const IColumn & column, size_t level) const;
DataTypePtr tryGetSubcolumnTypeImpl(const String & subcolumn_name, size_t level) const;
SerializationPtr getSubcolumnSerializationImpl(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter, size_t level) const;
};
}

View File

@ -1,14 +1,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/Serializations/SerializationDate.h>
#include <DataTypes/DataTypeFactory.h>
#include <Common/assert_cast.h>
namespace DB
{

View File

@ -1,28 +1,12 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/Serializations/SerializationDateTime.h>
#include <Columns/ColumnVector.h>
#include <Common/assert_cast.h>
#include <base/DateLUT.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/parseDateTimeBestEffort.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
TimezoneMixin::TimezoneMixin(const String & time_zone_name)
: has_explicit_time_zone(!time_zone_name.empty()),
time_zone(DateLUT::instance(time_zone_name)),
utc_time_zone(DateLUT::instance("UTC"))
{
}
DataTypeDateTime::DataTypeDateTime(const String & time_zone_name)
: TimezoneMixin(time_zone_name)
{
@ -52,7 +36,7 @@ bool DataTypeDateTime::equals(const IDataType & rhs) const
SerializationPtr DataTypeDateTime::doGetDefaultSerialization() const
{
return std::make_shared<SerializationDateTime>(time_zone, utc_time_zone);
return std::make_shared<SerializationDateTime>(*this);
}
}

View File

@ -2,33 +2,11 @@
#include <Core/Types.h>
#include <DataTypes/DataTypeNumberBase.h>
class DateLUTImpl;
#include <DataTypes/TimezoneMixin.h>
namespace DB
{
/** Mixin-class that manages timezone info for timezone-aware DateTime implementations
*
* Must be used as a (second) base for class implementing IDateType-interface.
*/
class TimezoneMixin
{
public:
explicit TimezoneMixin(const String & time_zone_name = "");
TimezoneMixin(const TimezoneMixin &) = default;
const DateLUTImpl & getTimeZone() const { return time_zone; }
bool hasExplicitTimeZone() const { return has_explicit_time_zone; }
protected:
/// true if time zone name was provided in data type parameters, false if it's using default time zone.
bool has_explicit_time_zone;
const DateLUTImpl & time_zone;
const DateLUTImpl & utc_time_zone;
};
/** DateTime stores time as unix timestamp.
* The value itself is independent of time zone.
*

View File

@ -1,19 +1,7 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/Serializations/SerializationDateTime64.h>
#include <Columns/ColumnVector.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <base/DateLUT.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/parseDateTimeBestEffort.h>
#include <Parsers/ASTLiteral.h>
#include <optional>
#include <string>
@ -65,7 +53,7 @@ bool DataTypeDateTime64::equals(const IDataType & rhs) const
SerializationPtr DataTypeDateTime64::doGetDefaultSerialization() const
{
return std::make_shared<SerializationDateTime64>(time_zone, utc_time_zone, scale);
return std::make_shared<SerializationDateTime64>(scale, *this);
}
}

View File

@ -1,15 +1,5 @@
#include <DataTypes/DataTypeDecimalBase.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/IAST.h>
#include <type_traits>
namespace DB

View File

@ -1,5 +1,4 @@
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/Serializations/SerializationEnum.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -1,22 +1,12 @@
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnConst.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationFixedString.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
namespace DB
{

View File

@ -1,9 +1,7 @@
#include <base/map.h>
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnArray.h>
#include <Core/Field.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
@ -11,14 +9,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationMap.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTNameTypePair.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/quoteString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
@ -84,27 +75,6 @@ std::string DataTypeMap::doGetName() const
return s.str();
}
static const IColumn & extractNestedColumn(const IColumn & column)
{
return assert_cast<const ColumnMap &>(column).getNestedColumn();
}
DataTypePtr DataTypeMap::tryGetSubcolumnType(const String & subcolumn_name) const
{
return nested->tryGetSubcolumnType(subcolumn_name);
}
ColumnPtr DataTypeMap::getSubcolumn(const String & subcolumn_name, const IColumn & column) const
{
return nested->getSubcolumn(subcolumn_name, extractNestedColumn(column));
}
SerializationPtr DataTypeMap::getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const
{
return nested->getSubcolumnSerialization(subcolumn_name, base_serialization_getter);
}
MutableColumnPtr DataTypeMap::createColumn() const
{
return ColumnMap::create(nested->createColumn());

View File

@ -32,11 +32,6 @@ public:
bool canBeInsideNullable() const override { return false; }
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const override;
ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const override;
SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const override;
MutableColumnPtr createColumn() const override;
Field getDefault() const override;

View File

@ -2,7 +2,6 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/quoteString.h>
#include <Parsers/ASTNameTypePair.h>

View File

@ -1,10 +1,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/Serializations/SerializationNothing.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnNothing.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
namespace DB

View File

@ -1,17 +1,9 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationNullable.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <Columns/ColumnNullable.h>
#include <Core/Field.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/ConcatReadBuffer.h>
#include <Parsers/IAST.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
@ -63,32 +55,6 @@ bool DataTypeNullable::equals(const IDataType & rhs) const
return rhs.isNullable() && nested_data_type->equals(*static_cast<const DataTypeNullable &>(rhs).nested_data_type);
}
DataTypePtr DataTypeNullable::tryGetSubcolumnType(const String & subcolumn_name) const
{
if (subcolumn_name == "null")
return std::make_shared<DataTypeUInt8>();
return nested_data_type->tryGetSubcolumnType(subcolumn_name);
}
ColumnPtr DataTypeNullable::getSubcolumn(const String & subcolumn_name, const IColumn & column) const
{
const auto & column_nullable = assert_cast<const ColumnNullable &>(column);
if (subcolumn_name == "null")
return column_nullable.getNullMapColumnPtr();
return nested_data_type->getSubcolumn(subcolumn_name, column_nullable.getNestedColumn());
}
SerializationPtr DataTypeNullable::getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const
{
if (subcolumn_name == "null")
return std::make_shared<SerializationTupleElement>(base_serialization_getter(DataTypeUInt8()), subcolumn_name, false);
return nested_data_type->getSubcolumnSerialization(subcolumn_name, base_serialization_getter);
}
SerializationPtr DataTypeNullable::doGetDefaultSerialization() const
{
return std::make_shared<SerializationNullable>(nested_data_type->getDefaultSerialization());

View File

@ -41,11 +41,6 @@ public:
bool onlyNull() const override;
bool canBeInsideLowCardinality() const override { return nested_data_type->canBeInsideLowCardinality(); }
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const override;
ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const override;
SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const override;
const DataTypePtr & getNestedType() const { return nested_data_type; }
private:
SerializationPtr doGetDefaultSerialization() const override;

View File

@ -1,13 +1,6 @@
#include <type_traits>
#include <DataTypes/DataTypeNumberBase.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnConst.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/NaNUtils.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -1,14 +1,6 @@
#include <Core/Defines.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnConst.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Core/Field.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationString.h>
@ -16,15 +8,6 @@
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
#ifdef __SSE2__
#include <emmintrin.h>
#endif
namespace DB
{

View File

@ -1,7 +1,5 @@
#pragma once
#include <ostream>
#include <DataTypes/IDataType.h>

View File

@ -3,20 +3,17 @@
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnTuple.h>
#include <Core/Field.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationTuple.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <DataTypes/Serializations/SerializationNamed.h>
#include <DataTypes/NestedUtils.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTNameTypePair.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/quoteString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
@ -30,7 +27,6 @@ namespace ErrorCodes
extern const int DUPLICATE_COLUMN;
extern const int EMPTY_DATA_PASSED;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH;
}
@ -107,11 +103,6 @@ static inline IColumn & extractElementColumn(IColumn & column, size_t idx)
return assert_cast<ColumnTuple &>(column).getColumn(idx);
}
static inline const IColumn & extractElementColumn(const IColumn & column, size_t idx)
{
return assert_cast<const ColumnTuple &>(column).getColumn(idx);
}
template <typename F>
static void addElementSafe(const DataTypes & elems, IColumn & column, F && impl)
{
@ -234,74 +225,6 @@ size_t DataTypeTuple::getSizeOfValueInMemory() const
return res;
}
template <typename OnSuccess, typename OnContinue>
auto DataTypeTuple::getSubcolumnEntity(const String & subcolumn_name,
const OnSuccess & on_success, const OnContinue & on_continue) const
{
using ReturnType = decltype(on_success(0));
for (size_t i = 0; i < names.size(); ++i)
{
if (startsWith(subcolumn_name, names[i]))
{
size_t name_length = names[i].size();
if (subcolumn_name.size() == name_length)
return on_success(i);
if (subcolumn_name[name_length] == '.')
return on_continue(i, subcolumn_name.substr(name_length + 1));
}
}
return ReturnType{};
}
DataTypePtr DataTypeTuple::tryGetSubcolumnType(const String & subcolumn_name) const
{
if (subcolumn_name == MAIN_SUBCOLUMN_NAME)
return shared_from_this();
auto on_success = [&](size_t pos) { return elems[pos]; };
auto on_continue = [&](size_t pos, const String & next_subcolumn) { return elems[pos]->tryGetSubcolumnType(next_subcolumn); };
return getSubcolumnEntity(subcolumn_name, on_success, on_continue);
}
ColumnPtr DataTypeTuple::getSubcolumn(const String & subcolumn_name, const IColumn & column) const
{
auto on_success = [&](size_t pos) { return extractElementColumn(column, pos).getPtr(); };
auto on_continue = [&](size_t pos, const String & next_subcolumn)
{
return elems[pos]->getSubcolumn(next_subcolumn, extractElementColumn(column, pos));
};
if (auto subcolumn = getSubcolumnEntity(subcolumn_name, on_success, on_continue))
return subcolumn;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
SerializationPtr DataTypeTuple::getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const
{
auto on_success = [&](size_t pos)
{
return std::make_shared<SerializationTupleElement>(base_serialization_getter(*elems[pos]), names[pos]);
};
auto on_continue = [&](size_t pos, const String & next_subcolumn)
{
auto next_serialization = elems[pos]->getSubcolumnSerialization(next_subcolumn, base_serialization_getter);
return std::make_shared<SerializationTupleElement>(next_serialization, names[pos]);
};
if (auto serialization = getSubcolumnEntity(subcolumn_name, on_success, on_continue))
return serialization;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
{
SerializationTuple::ElementSerializations serializations(elems.size());
@ -310,7 +233,7 @@ SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
{
String elem_name = use_explicit_names ? names[i] : toString(i + 1);
auto serialization = elems[i]->getDefaultSerialization();
serializations[i] = std::make_shared<SerializationTupleElement>(serialization, elem_name);
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name);
}
return std::make_shared<SerializationTuple>(std::move(serializations), use_explicit_names);
@ -325,7 +248,7 @@ SerializationPtr DataTypeTuple::getSerialization(const String & column_name, con
String elem_name = use_explicit_names ? names[i] : toString(i + 1);
auto subcolumn_name = Nested::concatenateName(column_name, elem_name);
auto serializaion = elems[i]->getSerialization(subcolumn_name, callback);
serializations[i] = std::make_shared<SerializationTupleElement>(serializaion, elem_name);
serializations[i] = std::make_shared<SerializationNamed>(serializaion, elem_name);
}
return std::make_shared<SerializationTuple>(std::move(serializations), use_explicit_names);

View File

@ -52,16 +52,11 @@ public:
size_t getMaximumSizeOfValueInMemory() const override;
size_t getSizeOfValueInMemory() const override;
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const override;
ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const override;
SerializationPtr getSerialization(const String & column_name, const StreamExistenceCallback & callback) const override;
SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const override;
SerializationPtr doGetDefaultSerialization() const override;
const DataTypePtr & getElement(size_t i) const { return elems[i]; }
const DataTypes & getElements() const { return elems; }
const Strings & getElementNames() const { return names; }
@ -69,11 +64,6 @@ public:
bool haveExplicitNames() const { return have_explicit_names; }
bool serializeNames() const { return serialize_names; }
private:
template <typename OnSuccess, typename OnContinue>
auto getSubcolumnEntity(const String & subcolumn_name,
const OnSuccess & on_success, const OnContinue & on_continue) const;
};
}

View File

@ -1,16 +1,13 @@
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/Serializations/SerializationDecimal.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/readDecimalText.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/IAST.h>
#include <type_traits>

View File

@ -2,7 +2,6 @@
#include <Columns/ColumnConst.h>
#include <Common/Exception.h>
#include <Common/escapeForFileName.h>
#include <Common/SipHash.h>
#include <IO/WriteHelpers.h>
@ -11,7 +10,6 @@
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeCustom.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
namespace DB
@ -65,12 +63,40 @@ size_t IDataType::getSizeOfValueInMemory() const
throw Exception("Value of type " + getName() + " in memory is not of fixed size.", ErrorCodes::LOGICAL_ERROR);
}
void IDataType::forEachSubcolumn(
const SubcolumnCallback & callback,
const SerializationPtr & serialization,
const DataTypePtr & type,
const ColumnPtr & column)
{
ISerialization::StreamCallback callback_with_data = [&](const auto & subpath)
{
for (size_t i = 0; i < subpath.size(); ++i)
{
if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, i + 1))
{
auto name = ISerialization::getSubcolumnNameForStream(subpath, i + 1);
auto data = ISerialization::createFromPath(subpath, i);
callback(subpath, name, data);
}
subpath[i].visited = true;
}
};
ISerialization::SubstreamPath path;
serialization->enumerateStreams(path, callback_with_data, type, column);
}
DataTypePtr IDataType::tryGetSubcolumnType(const String & subcolumn_name) const
{
if (subcolumn_name == MAIN_SUBCOLUMN_NAME)
return shared_from_this();
DataTypePtr res;
forEachSubcolumn([&](const auto &, const auto & name, const auto & data)
{
if (name == subcolumn_name)
res = data.type;
}, getDefaultSerialization(), getPtr(), nullptr);
return nullptr;
return res;
}
DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const
@ -82,42 +108,43 @@ DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const IColumn &) const
SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const
{
SerializationPtr res;
forEachSubcolumn([&](const auto &, const auto & name, const auto & data)
{
if (name == subcolumn_name)
res = data.serialization;
}, serialization, nullptr, nullptr);
if (res)
return res;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
void IDataType::forEachSubcolumn(const SubcolumnCallback & callback) const
ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const
{
NameSet set;
getDefaultSerialization()->enumerateStreams([&, this](const ISerialization::SubstreamPath & substream_path)
ColumnPtr res;
forEachSubcolumn([&](const auto &, const auto & name, const auto & data)
{
ISerialization::SubstreamPath new_path;
/// Iterate over path to try to get intermediate subcolumns for complex nested types.
for (const auto & elem : substream_path)
{
new_path.push_back(elem);
auto name = ISerialization::getSubcolumnNameForStream(new_path);
auto type = tryGetSubcolumnType(name);
if (name == subcolumn_name)
res = data.column;
}, getDefaultSerialization(), nullptr, column);
/// Subcolumn names may repeat among several substream paths.
if (!name.empty() && type && !set.count(name))
{
callback(name, type, substream_path);
set.insert(name);
}
}
});
if (res)
return res;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
Names IDataType::getSubcolumnNames() const
{
Names res;
forEachSubcolumn([&](const auto & name, const auto &, const auto &)
forEachSubcolumn([&](const auto &, const auto & name, const auto &)
{
res.push_back(name);
});
}, getDefaultSerialization(), nullptr, nullptr);
return res;
}
@ -144,24 +171,14 @@ SerializationPtr IDataType::getDefaultSerialization() const
return doGetDefaultSerialization();
}
SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const BaseSerializationGetter &) const
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
// static
SerializationPtr IDataType::getSerialization(const NameAndTypePair & column, const IDataType::StreamExistenceCallback & callback)
{
if (column.isSubcolumn())
{
/// Wrap to custom serialization deepest subcolumn, which is represented in non-complex type.
auto base_serialization_getter = [&](const IDataType & subcolumn_type)
{
return subcolumn_type.getSerialization(column.name, callback);
};
const auto & type_in_storage = column.getTypeInStorage();
return type_in_storage->getSubcolumnSerialization(column.getSubcolumnName(), base_serialization_getter);
auto default_serialization = type_in_storage->getDefaultSerialization();
return type_in_storage->getSubcolumnSerialization(column.getSubcolumnName(), default_serialization);
}
return column.type->getSerialization(column.name, callback);
@ -172,21 +189,4 @@ SerializationPtr IDataType::getSerialization(const String &, const StreamExisten
return getDefaultSerialization();
}
DataTypePtr IDataType::getTypeForSubstream(const ISerialization::SubstreamPath & substream_path) const
{
auto type = tryGetSubcolumnType(ISerialization::getSubcolumnNameForStream(substream_path));
if (type)
return type->getSubcolumnType(MAIN_SUBCOLUMN_NAME);
return getSubcolumnType(MAIN_SUBCOLUMN_NAME);
}
void IDataType::enumerateStreams(const SerializationPtr & serialization, const StreamCallbackWithType & callback, ISerialization::SubstreamPath & path) const
{
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
callback(substream_path, *getTypeForSubstream(substream_path));
}, path);
}
}

View File

@ -70,19 +70,31 @@ public:
return doGetName();
}
DataTypePtr getPtr() const { return shared_from_this(); }
/// Name of data type family (example: FixedString, Array).
virtual const char * getFamilyName() const = 0;
/// Data type id. It's used for runtime type checks.
virtual TypeIndex getTypeId() const = 0;
static constexpr auto MAIN_SUBCOLUMN_NAME = "__main";
virtual DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const;
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const;
DataTypePtr getSubcolumnType(const String & subcolumn_name) const;
virtual ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const;
using SubcolumnCallback = std::function<void(const String &, const DataTypePtr &, const ISerialization::SubstreamPath &)>;
void forEachSubcolumn(const SubcolumnCallback & callback) const;
SerializationPtr getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const;
ColumnPtr getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const;
using SubcolumnCallback = std::function<void(
const ISerialization::SubstreamPath &,
const String &,
const ISerialization::SubstreamData &)>;
static void forEachSubcolumn(
const SubcolumnCallback & callback,
const SerializationPtr & serialization,
const DataTypePtr & type,
const ColumnPtr & column);
Names getSubcolumnNames() const;
/// Returns default serialization of data type.
@ -93,7 +105,6 @@ public:
/// one of serialization types, that serialization will be chosen for reading.
/// If callback always returned false, the default serialization will be chosen.
using StreamExistenceCallback = std::function<bool(const String &)>;
using BaseSerializationGetter = std::function<SerializationPtr(const IDataType &)>;
/// Chooses serialization for reading of one column or subcolumns by
/// checking existence of substreams using callback.
@ -103,22 +114,10 @@ public:
virtual SerializationPtr getSerialization(const String & column_name, const StreamExistenceCallback & callback) const;
/// Returns serialization wrapper for reading one particular subcolumn of data type.
virtual SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const;
using StreamCallbackWithType = std::function<void(const ISerialization::SubstreamPath &, const IDataType &)>;
void enumerateStreams(const SerializationPtr & serialization, const StreamCallbackWithType & callback, ISerialization::SubstreamPath & path) const;
void enumerateStreams(const SerializationPtr & serialization, const StreamCallbackWithType & callback, ISerialization::SubstreamPath && path) const { enumerateStreams(serialization, callback, path); }
void enumerateStreams(const SerializationPtr & serialization, const StreamCallbackWithType & callback) const { enumerateStreams(serialization, callback, {}); }
protected:
virtual String doGetName() const { return getFamilyName(); }
virtual SerializationPtr doGetDefaultSerialization() const = 0;
DataTypePtr getTypeForSubstream(const ISerialization::SubstreamPath & substream_path) const;
public:
/** Create empty column for corresponding type.
*/

View File

@ -5,6 +5,7 @@
#include <IO/Operators.h>
#include <Common/escapeForFileName.h>
#include <DataTypes/NestedUtils.h>
#include <base/EnumReflection.h>
namespace DB
@ -17,30 +18,11 @@ namespace ErrorCodes
String ISerialization::Substream::toString() const
{
switch (type)
{
case ArrayElements:
return "ArrayElements";
case ArraySizes:
return "ArraySizes";
case NullableElements:
return "NullableElements";
case NullMap:
return "NullMap";
case TupleElement:
return "TupleElement(" + tuple_element_name + ", "
+ std::to_string(escape_tuple_delimiter) + ")";
case DictionaryKeys:
return "DictionaryKeys";
case DictionaryIndexes:
return "DictionaryIndexes";
case SparseElements:
return "SparseElements";
case SparseOffsets:
return "SparseOffsets";
}
if (type == TupleElement)
return fmt::format("TupleElement({}, escape_tuple_delimiter={})",
tuple_element_name, escape_tuple_delimiter ? "true" : "false");
__builtin_unreachable();
return String(magic_enum::enum_name(type));
}
String ISerialization::SubstreamPath::toString() const
@ -57,9 +39,21 @@ String ISerialization::SubstreamPath::toString() const
return wb.str();
}
void ISerialization::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
path.push_back(Substream::Regular);
path.back().data = {type, column, getPtr(), nullptr};
callback(path);
path.pop_back();
}
void ISerialization::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path);
enumerateStreams(path, callback, nullptr, nullptr);
}
void ISerialization::serializeBinaryBulk(const IColumn & column, WriteBuffer &, size_t, size_t) const
@ -104,40 +98,48 @@ void ISerialization::deserializeBinaryBulkWithMultipleStreams(
}
}
static String getNameForSubstreamPath(
namespace
{
using SubstreamIterator = ISerialization::SubstreamPath::const_iterator;
String getNameForSubstreamPath(
String stream_name,
const ISerialization::SubstreamPath & path,
SubstreamIterator begin,
SubstreamIterator end,
bool escape_tuple_delimiter)
{
using Substream = ISerialization::Substream;
size_t array_level = 0;
for (const auto & elem : path)
for (auto it = begin; it != end; ++it)
{
if (elem.type == Substream::NullMap)
if (it->type == Substream::NullMap)
stream_name += ".null";
else if (elem.type == Substream::ArraySizes)
else if (it->type == Substream::ArraySizes)
stream_name += ".size" + toString(array_level);
else if (elem.type == Substream::ArrayElements)
else if (it->type == Substream::ArrayElements)
++array_level;
else if (elem.type == Substream::DictionaryKeys)
else if (it->type == Substream::DictionaryKeys)
stream_name += ".dict";
else if (elem.type == Substream::SparseOffsets)
else if (it->type == Substream::SparseOffsets)
stream_name += ".sparse.idx";
else if (elem.type == Substream::TupleElement)
else if (it->type == Substream::TupleElement)
{
/// For compatibility reasons, we use %2E (escaped dot) instead of dot.
/// Because nested data may be represented not by Array of Tuple,
/// but by separate Array columns with names in a form of a.b,
/// and name is encoded as a whole.
stream_name += (escape_tuple_delimiter && elem.escape_tuple_delimiter ?
escapeForFileName(".") : ".") + escapeForFileName(elem.tuple_element_name);
stream_name += (escape_tuple_delimiter && it->escape_tuple_delimiter ?
escapeForFileName(".") : ".") + escapeForFileName(it->tuple_element_name);
}
}
return stream_name;
}
}
String ISerialization::getFileNameForStream(const NameAndTypePair & column, const SubstreamPath & path)
{
return getFileNameForStream(column.getNameInStorage(), path);
@ -152,12 +154,17 @@ String ISerialization::getFileNameForStream(const String & name_in_storage, cons
else
stream_name = escapeForFileName(name_in_storage);
return getNameForSubstreamPath(std::move(stream_name), path, true);
return getNameForSubstreamPath(std::move(stream_name), path.begin(), path.end(), true);
}
String ISerialization::getSubcolumnNameForStream(const SubstreamPath & path)
{
auto subcolumn_name = getNameForSubstreamPath("", path, false);
return getSubcolumnNameForStream(path, path.size());
}
String ISerialization::getSubcolumnNameForStream(const SubstreamPath & path, size_t prefix_len)
{
auto subcolumn_name = getNameForSubstreamPath("", path.begin(), path.begin() + prefix_len, false);
if (!subcolumn_name.empty())
subcolumn_name = subcolumn_name.substr(1); // It starts with a dot.
@ -195,4 +202,44 @@ bool ISerialization::isSpecialCompressionAllowed(const SubstreamPath & path)
return true;
}
size_t ISerialization::getArrayLevel(const SubstreamPath & path)
{
size_t level = 0;
for (const auto & elem : path)
level += elem.type == Substream::ArrayElements;
return level;
}
bool ISerialization::hasSubcolumnForPath(const SubstreamPath & path, size_t prefix_len)
{
if (prefix_len == 0 || prefix_len > path.size())
return false;
size_t last_elem = prefix_len - 1;
return path[last_elem].type == Substream::NullMap
|| path[last_elem].type == Substream::TupleElement
|| path[last_elem].type == Substream::ArraySizes;
}
ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len)
{
assert(prefix_len < path.size());
SubstreamData res = path[prefix_len].data;
res.creator.reset();
for (ssize_t i = static_cast<ssize_t>(prefix_len) - 1; i >= 0; --i)
{
const auto & creator = path[i].data.creator;
if (creator)
{
res.type = res.type ? creator->create(res.type) : res.type;
res.serialization = res.serialization ? creator->create(res.serialization) : res.serialization;
res.column = res.column ? creator->create(res.column) : res.column;
}
}
return res;
}
}

View File

@ -2,35 +2,39 @@
#include <Common/COW.h>
#include <Core/Types.h>
#include <Columns/IColumn.h>
#include <boost/noncopyable.hpp>
#include <unordered_map>
#include <memory>
namespace DB
{
class IDataType;
class ReadBuffer;
class WriteBuffer;
class ProtobufReader;
class ProtobufWriter;
class IColumn;
using ColumnPtr = COW<IColumn>::Ptr;
using MutableColumnPtr = COW<IColumn>::MutablePtr;
class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
class ISerialization;
using SerializationPtr = std::shared_ptr<const ISerialization>;
class Field;
struct FormatSettings;
struct NameAndTypePair;
class ISerialization
class ISerialization : private boost::noncopyable, public std::enable_shared_from_this<ISerialization>
{
public:
ISerialization() = default;
virtual ~ISerialization() = default;
SerializationPtr getPtr() const { return shared_from_this(); }
/** Binary serialization for range of values in column - for writing to disk/network, etc.
*
* Some data types are represented in multiple streams while being serialized.
@ -54,6 +58,24 @@ public:
* Default implementations of ...WithMultipleStreams methods will call serializeBinaryBulk, deserializeBinaryBulk for single stream.
*/
struct ISubcolumnCreator
{
virtual DataTypePtr create(const DataTypePtr & prev) const = 0;
virtual SerializationPtr create(const SerializationPtr & prev) const = 0;
virtual ColumnPtr create(const ColumnPtr & prev) const = 0;
virtual ~ISubcolumnCreator() = default;
};
using SubcolumnCreatorPtr = std::shared_ptr<const ISubcolumnCreator>;
struct SubstreamData
{
DataTypePtr type;
ColumnPtr column;
SerializationPtr serialization;
SubcolumnCreatorPtr creator;
};
struct Substream
{
enum Type
@ -71,7 +93,10 @@ public:
SparseElements,
SparseOffsets,
Regular,
};
Type type;
/// Index of tuple element, starting at 1 or name.
@ -80,6 +105,12 @@ public:
/// Do we need to escape a dot in filenames for tuple elements.
bool escape_tuple_delimiter = true;
/// Data for current substream.
SubstreamData data;
/// Flag, that may help to traverse substream paths.
mutable bool visited = false;
Substream(Type type_) : type(type_) {}
String toString() const;
@ -96,7 +127,13 @@ public:
using StreamCallback = std::function<void(const SubstreamPath &)>;
virtual void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const;
virtual void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const;
void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); }
void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); }
@ -249,11 +286,16 @@ public:
static String getFileNameForStream(const NameAndTypePair & column, const SubstreamPath & path);
static String getFileNameForStream(const String & name_in_storage, const SubstreamPath & path);
static String getSubcolumnNameForStream(const SubstreamPath & path);
static String getSubcolumnNameForStream(const SubstreamPath & path, size_t prefix_len);
static void addToSubstreamsCache(SubstreamsCache * cache, const SubstreamPath & path, ColumnPtr column);
static ColumnPtr getFromSubstreamsCache(SubstreamsCache * cache, const SubstreamPath & path);
static bool isSpecialCompressionAllowed(const SubstreamPath & path);
static size_t getArrayLevel(const SubstreamPath & path);
static bool hasSubcolumnForPath(const SubstreamPath & path, size_t prefix_len);
static SubstreamData createFromPath(const SubstreamPath & path, size_t prefix_len);
};
using SerializationPtr = std::shared_ptr<const ISerialization>;

View File

@ -1,7 +1,6 @@
#include <DataTypes/Serializations/SerializationAggregateFunction.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Columns/ColumnAggregateFunction.h>

View File

@ -1,5 +1,8 @@
#include <DataTypes/Serializations/SerializationArray.h>
#include <DataTypes/Serializations/SerializationNumber.h>
#include <DataTypes/Serializations/SerializationNamed.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -177,16 +180,53 @@ ColumnPtr arrayOffsetsToSizes(const IColumn & column)
return column_sizes;
}
void SerializationArray::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
DataTypePtr SerializationArray::SubcolumnCreator::create(const DataTypePtr & prev) const
{
path.push_back(Substream::ArraySizes);
callback(path);
path.back() = Substream::ArrayElements;
nested->enumerateStreams(callback, path);
path.pop_back();
return std::make_shared<DataTypeArray>(prev);
}
SerializationPtr SerializationArray::SubcolumnCreator::create(const SerializationPtr & prev) const
{
return std::make_shared<SerializationArray>(prev);
}
ColumnPtr SerializationArray::SubcolumnCreator::create(const ColumnPtr & prev) const
{
return ColumnArray::create(prev, offsets);
}
void SerializationArray::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
const auto * type_array = type ? &assert_cast<const DataTypeArray &>(*type) : nullptr;
const auto * column_array = column ? &assert_cast<const ColumnArray &>(*column) : nullptr;
auto offsets_column = column_array ? column_array->getOffsetsPtr() : nullptr;
path.push_back(Substream::ArraySizes);
path.back().data =
{
type ? std::make_shared<DataTypeUInt64>() : nullptr,
offsets_column ? arrayOffsetsToSizes(*offsets_column) : nullptr,
std::make_shared<SerializationNamed>(
std::make_shared<SerializationNumber<UInt64>>(),
"size" + std::to_string(getArrayLevel(path)), false),
nullptr,
};
callback(path);
path.back() = Substream::ArrayElements;
path.back().data = {type, column, getPtr(), std::make_shared<SubcolumnCreator>(offsets_column)};
auto next_type = type_array ? type_array->getNestedType() : nullptr;
auto next_column = column_array ? column_array->getDataPtr() : nullptr;
nested->enumerateStreams(path, callback, next_type, next_column);
path.pop_back();
}
void SerializationArray::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -35,7 +35,11 @@ public:
* This is necessary, because when implementing nested structures, several arrays can have common sizes.
*/
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
@ -62,6 +66,18 @@ public:
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache) const override;
private:
struct SubcolumnCreator : public ISubcolumnCreator
{
const ColumnPtr offsets;
SubcolumnCreator(const ColumnPtr & offsets_) : offsets(offsets_) {}
DataTypePtr create(const DataTypePtr & prev) const override;
SerializationPtr create(const SerializationPtr & prev) const override;
ColumnPtr create(const ColumnPtr & prev) const override;
};
};
ColumnPtr arrayOffsetsToSizes(const IColumn & column);

View File

@ -8,6 +8,7 @@
namespace DB
{
void SerializationDate32::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const
{
writeDateText(ExtendedDayNum(assert_cast<const ColumnInt32 &>(column).getData()[row_num]), ostr);

View File

@ -32,9 +32,8 @@ inline void readText(time_t & x, ReadBuffer & istr, const FormatSettings & setti
}
SerializationDateTime::SerializationDateTime(
const DateLUTImpl & time_zone_, const DateLUTImpl & utc_time_zone_)
: time_zone(time_zone_), utc_time_zone(utc_time_zone_)
SerializationDateTime::SerializationDateTime(const TimezoneMixin & time_zone_)
: TimezoneMixin(time_zone_)
{
}

View File

@ -1,20 +1,17 @@
#pragma once
#include <DataTypes/Serializations/SerializationNumber.h>
#include <DataTypes/TimezoneMixin.h>
class DateLUTImpl;
namespace DB
{
class SerializationDateTime final : public SerializationNumber<UInt32>
class SerializationDateTime final : public SerializationNumber<UInt32>, public TimezoneMixin
{
private:
const DateLUTImpl & time_zone;
const DateLUTImpl & utc_time_zone;
public:
SerializationDateTime(const DateLUTImpl & time_zone_, const DateLUTImpl & utc_time_zone_);
SerializationDateTime(const TimezoneMixin & time_zone_);
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;

View File

@ -17,9 +17,9 @@ namespace DB
{
SerializationDateTime64::SerializationDateTime64(
const DateLUTImpl & time_zone_, const DateLUTImpl & utc_time_zone_, UInt32 scale_)
UInt32 scale_, const TimezoneMixin & time_zone_)
: SerializationDecimalBase<DateTime64>(DecimalUtils::max_precision<DateTime64>, scale_)
, time_zone(time_zone_), utc_time_zone(utc_time_zone_)
, TimezoneMixin(time_zone_)
{
}

View File

@ -1,20 +1,17 @@
#pragma once
#include <DataTypes/Serializations/SerializationDecimalBase.h>
#include <DataTypes/TimezoneMixin.h>
class DateLUTImpl;
namespace DB
{
class SerializationDateTime64 final : public SerializationDecimalBase<DateTime64>
class SerializationDateTime64 final : public SerializationDecimalBase<DateTime64>, public TimezoneMixin
{
private:
const DateLUTImpl & time_zone;
const DateLUTImpl & utc_time_zone;
public:
SerializationDateTime64(const DateLUTImpl & time_zone_, const DateLUTImpl & utc_time_zone_, UInt32 scale_);
SerializationDateTime64(UInt32 scale_, const TimezoneMixin & time_zone_);
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;

View File

@ -40,11 +40,27 @@ SerializationLowCardinality::SerializationLowCardinality(const DataTypePtr & dic
{
}
void SerializationLowCardinality::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
void SerializationLowCardinality::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
const auto * column_lc = column ? &getColumnLowCardinality(*column) : nullptr;
SubstreamData data;
data.type = type ? dictionary_type : nullptr;
data.column = column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr;
data.serialization = dict_inner_serialization;
path.push_back(Substream::DictionaryKeys);
dict_inner_serialization->enumerateStreams(callback, path);
path.back().data = data;
dict_inner_serialization->enumerateStreams(path, callback, data.type, data.column);
path.back() = Substream::DictionaryIndexes;
path.back().data = {type, column, getPtr(), nullptr};
callback(path);
path.pop_back();
}

View File

@ -17,7 +17,11 @@ private:
public:
SerializationLowCardinality(const DataTypePtr & dictionary_type);
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -3,6 +3,7 @@
#include <DataTypes/Serializations/SerializationMap.h>
#include <DataTypes/Serializations/SerializationArray.h>
#include <DataTypes/Serializations/SerializationTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnMap.h>
@ -250,10 +251,16 @@ void SerializationMap::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
deserializeText(column, rb, settings);
}
void SerializationMap::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
void SerializationMap::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
nested->enumerateStreams(callback, path);
auto next_type = type ? assert_cast<const DataTypeMap &>(*type).getNestedType() : nullptr;
auto next_column = column ? assert_cast<const ColumnMap &>(*column).getNestedColumnPtr() : nullptr;
nested->enumerateStreams(path, callback, next_type, next_column);
}
void SerializationMap::serializeBinaryBulkStatePrefix(

View File

@ -31,7 +31,11 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -1,18 +1,21 @@
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <DataTypes/Serializations/SerializationNamed.h>
namespace DB
{
void SerializationTupleElement::enumerateStreams(
void SerializationNamed::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
SubstreamPath & path) const
DataTypePtr type,
ColumnPtr column) const
{
addToPath(path);
nested_serialization->enumerateStreams(callback, path);
path.back().data = {type, column, getPtr(), std::make_shared<SubcolumnCreator>(name, escape_delimiter)};
nested_serialization->enumerateStreams(path, callback, type, column);
path.pop_back();
}
void SerializationTupleElement::serializeBinaryBulkStatePrefix(
void SerializationNamed::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
@ -21,7 +24,7 @@ void SerializationTupleElement::serializeBinaryBulkStatePrefix(
settings.path.pop_back();
}
void SerializationTupleElement::serializeBinaryBulkStateSuffix(
void SerializationNamed::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
@ -30,7 +33,7 @@ void SerializationTupleElement::serializeBinaryBulkStateSuffix(
settings.path.pop_back();
}
void SerializationTupleElement::deserializeBinaryBulkStatePrefix(
void SerializationNamed::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
@ -39,7 +42,7 @@ void SerializationTupleElement::deserializeBinaryBulkStatePrefix(
settings.path.pop_back();
}
void SerializationTupleElement::serializeBinaryBulkWithMultipleStreams(
void SerializationNamed::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
size_t offset,
size_t limit,
@ -51,7 +54,7 @@ void SerializationTupleElement::serializeBinaryBulkWithMultipleStreams(
settings.path.pop_back();
}
void SerializationTupleElement::deserializeBinaryBulkWithMultipleStreams(
void SerializationNamed::deserializeBinaryBulkWithMultipleStreams(
ColumnPtr & column,
size_t limit,
DeserializeBinaryBulkSettings & settings,
@ -63,7 +66,7 @@ void SerializationTupleElement::deserializeBinaryBulkWithMultipleStreams(
settings.path.pop_back();
}
void SerializationTupleElement::addToPath(SubstreamPath & path) const
void SerializationNamed::addToPath(SubstreamPath & path) const
{
path.push_back(Substream::TupleElement);
path.back().tuple_element_name = name;

View File

@ -5,14 +5,14 @@
namespace DB
{
class SerializationTupleElement final : public SerializationWrapper
class SerializationNamed final : public SerializationWrapper
{
private:
String name;
bool escape_delimiter;
public:
SerializationTupleElement(const SerializationPtr & nested_, const String & name_, bool escape_delimiter_ = true)
SerializationNamed(const SerializationPtr & nested_, const String & name_, bool escape_delimiter_ = true)
: SerializationWrapper(nested_)
, name(name_), escape_delimiter(escape_delimiter_)
{
@ -21,11 +21,13 @@ public:
const String & getElementName() const { return name; }
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
SubstreamPath & path) const override;
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkStateSuffix(
@ -51,6 +53,22 @@ public:
SubstreamsCache * cache) const override;
private:
struct SubcolumnCreator : public ISubcolumnCreator
{
const String name;
const bool escape_delimiter;
SubcolumnCreator(const String & name_, bool escape_delimiter_)
: name(name_), escape_delimiter(escape_delimiter_) {}
DataTypePtr create(const DataTypePtr & prev) const override { return prev; }
ColumnPtr create(const ColumnPtr & prev) const override { return prev; }
SerializationPtr create(const SerializationPtr & prev) const override
{
return std::make_shared<SerializationNamed>(prev, name, escape_delimiter);
}
};
void addToPath(SubstreamPath & path) const;
};

View File

@ -1,5 +1,8 @@
#include <DataTypes/Serializations/SerializationNullable.h>
#include <DataTypes/Serializations/SerializationNumber.h>
#include <DataTypes/Serializations/SerializationNamed.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnNullable.h>
#include <Core/Field.h>
@ -20,15 +23,50 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
}
void SerializationNullable::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
DataTypePtr SerializationNullable::SubcolumnCreator::create(const DataTypePtr & prev) const
{
path.push_back(Substream::NullMap);
callback(path);
path.back() = Substream::NullableElements;
nested->enumerateStreams(callback, path);
path.pop_back();
return std::make_shared<DataTypeNullable>(prev);
}
SerializationPtr SerializationNullable::SubcolumnCreator::create(const SerializationPtr & prev) const
{
return std::make_shared<SerializationNullable>(prev);
}
ColumnPtr SerializationNullable::SubcolumnCreator::create(const ColumnPtr & prev) const
{
return ColumnNullable::create(prev, null_map);
}
void SerializationNullable::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
const auto * type_nullable = type ? &assert_cast<const DataTypeNullable &>(*type) : nullptr;
const auto * column_nullable = column ? &assert_cast<const ColumnNullable &>(*column) : nullptr;
path.push_back(Substream::NullMap);
path.back().data =
{
type_nullable ? std::make_shared<DataTypeUInt8>() : nullptr,
column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr,
std::make_shared<SerializationNamed>(std::make_shared<SerializationNumber<UInt8>>(), "null", false),
nullptr,
};
callback(path);
path.back() = Substream::NullableElements;
path.back().data = {type, column, getPtr(), std::make_shared<SubcolumnCreator>(path.back().data.column)};
auto next_type = type_nullable ? type_nullable->getNestedType() : nullptr;
auto next_column = column_nullable ? column_nullable->getNestedColumnPtr() : nullptr;
nested->enumerateStreams(path, callback, next_type, next_column);
path.pop_back();
}
void SerializationNullable::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -13,7 +13,11 @@ private:
public:
SerializationNullable(const SerializationPtr & nested_) : nested(nested_) {}
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
@ -80,6 +84,18 @@ public:
static ReturnType deserializeTextCSVImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested);
template <typename ReturnType = bool>
static ReturnType deserializeTextJSONImpl(IColumn & column, ReadBuffer & istr, const FormatSettings &, const SerializationPtr & nested);
private:
struct SubcolumnCreator : public ISubcolumnCreator
{
const ColumnPtr null_map;
SubcolumnCreator(const ColumnPtr & null_map_) : null_map(null_map_) {}
DataTypePtr create(const DataTypePtr & prev) const override;
SerializationPtr create(const SerializationPtr & prev) const override;
ColumnPtr create(const ColumnPtr & prev) const override;
};
};
}

View File

@ -1,6 +1,7 @@
#include <base/map.h>
#include <base/range.h>
#include <DataTypes/Serializations/SerializationTuple.h>
#include <DataTypes/DataTypeTuple.h>
#include <Core/Field.h>
#include <Columns/ColumnTuple.h>
#include <Common/assert_cast.h>
@ -281,10 +282,22 @@ void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
});
}
void SerializationTuple::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
void SerializationTuple::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
for (const auto & elem : elems)
elem->enumerateStreams(callback, path);
const auto * type_tuple = type ? &assert_cast<const DataTypeTuple &>(*type) : nullptr;
const auto * column_tuple = column ? &assert_cast<const ColumnTuple &>(*column) : nullptr;
for (size_t i = 0; i < elems.size(); ++i)
{
auto next_type = type_tuple ? type_tuple->getElement(i) : nullptr;
auto next_column = column_tuple ? column_tuple->getColumnPtr(i) : nullptr;
elems[i]->enumerateStreams(path, callback, next_type, next_column);
}
}
struct SerializeBinaryBulkStateTuple : public ISerialization::SerializeBinaryBulkState

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataTypes/Serializations/SimpleTextSerialization.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <DataTypes/Serializations/SerializationNamed.h>
namespace DB
{
@ -9,7 +9,7 @@ namespace DB
class SerializationTuple final : public SimpleTextSerialization
{
public:
using ElementSerializationPtr = std::shared_ptr<const SerializationTupleElement>;
using ElementSerializationPtr = std::shared_ptr<const SerializationNamed>;
using ElementSerializations = std::vector<ElementSerializationPtr>;
SerializationTuple(const ElementSerializations & elems_, bool have_explicit_names_)
@ -31,7 +31,11 @@ public:
/** Each sub-column in a tuple is serialized in separate stream.
*/
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -4,9 +4,13 @@
namespace DB
{
void SerializationWrapper::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
void SerializationWrapper::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
nested_serialization->enumerateStreams(callback, path);
nested_serialization->enumerateStreams(path, callback, type, column);
}
void SerializationWrapper::serializeBinaryBulkStatePrefix(

View File

@ -16,7 +16,11 @@ protected:
public:
SerializationWrapper(const SerializationPtr & nested_serialization_) : nested_serialization(nested_serialization_) {}
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -0,0 +1,32 @@
#pragma once
#include <Core/Types.h>
#include <base/DateLUT.h>
class DateLUTImpl;
/** Mixin-class that manages timezone info for timezone-aware DateTime implementations
*
* Must be used as a (second) base for class implementing IDateType/ISerialization-interface.
*/
class TimezoneMixin
{
public:
TimezoneMixin(const TimezoneMixin &) = default;
explicit TimezoneMixin(const String & time_zone_name = "")
: has_explicit_time_zone(!time_zone_name.empty())
, time_zone(DateLUT::instance(time_zone_name))
, utc_time_zone(DateLUT::instance("UTC"))
{
}
const DateLUTImpl & getTimeZone() const { return time_zone; }
bool hasExplicitTimeZone() const { return has_explicit_time_zone; }
protected:
/// true if time zone name was provided in data type parameters, false if it's using default time zone.
bool has_explicit_time_zone;
const DateLUTImpl & time_zone;
const DateLUTImpl & utc_time_zone;
};

View File

@ -40,6 +40,7 @@ public:
: ReadBufferFromFileBase(buf_size, existing_memory, alignment),
reader(std::move(reader_)), priority(priority_), required_alignment(alignment), fd(fd_)
{
prefetch_buffer.alignment = alignment;
}
~AsynchronousReadBufferFromFileDescriptor() override;

View File

@ -88,7 +88,11 @@ struct Memory : boost::noncopyable, Allocator
}
else
{
size_t new_capacity = align(new_size + pad_right, alignment);
size_t new_capacity = align(new_size, alignment) + pad_right;
size_t diff = new_capacity - m_capacity;
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, diff);
m_data = static_cast<char *>(Allocator::realloc(m_data, m_capacity, new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity - pad_right;
@ -101,6 +105,9 @@ private:
if (!alignment)
return value;
if (!(value % alignment))
return value;
return (value + alignment - 1) / alignment * alignment;
}
@ -112,12 +119,10 @@ private:
return;
}
size_t padded_capacity = m_capacity + pad_right;
ProfileEvents::increment(ProfileEvents::IOBufferAllocs);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, padded_capacity);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, m_capacity);
size_t new_capacity = align(padded_capacity, alignment);
size_t new_capacity = align(m_capacity, alignment) + pad_right;
m_data = static_cast<char *>(Allocator::alloc(new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity - pad_right;

View File

@ -5,6 +5,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/MemorySanitizer.h>
#include <base/errnoToString.h>
#include <Poco/Event.h>
#include <future>
@ -151,6 +152,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
else
{
bytes_read += res;
__msan_unpoison(request.buf, res);
}
}

View File

@ -1806,6 +1806,68 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper;
}
namespace
{
bool checkZooKeeperConfigIsLocal(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
for (const auto & key : keys)
{
if (startsWith(key, "node"))
{
String host = config.getString(config_name + "." + key + ".host");
if (isLocalAddress(DNSResolver::instance().resolveHost(host)))
return true;
}
}
return false;
}
}
bool Context::tryCheckClientConnectionToMyKeeperCluster() const
{
try
{
/// If our server is part of main Keeper cluster
if (checkZooKeeperConfigIsLocal(getConfigRef(), "zookeeper"))
{
LOG_DEBUG(shared->log, "Keeper server is participant of the main zookeeper cluster, will try to connect to it");
getZooKeeper();
/// Connected, return true
return true;
}
else
{
Poco::Util::AbstractConfiguration::Keys keys;
getConfigRef().keys("auxiliary_zookeepers", keys);
/// If our server is part of some auxiliary_zookeeper
for (const auto & aux_zk_name : keys)
{
if (checkZooKeeperConfigIsLocal(getConfigRef(), "auxiliary_zookeepers." + aux_zk_name))
{
LOG_DEBUG(shared->log, "Our Keeper server is participant of the auxiliary zookeeper cluster ({}), will try to connect to it", aux_zk_name);
getAuxiliaryZooKeeper(aux_zk_name);
/// Connected, return true
return true;
}
}
}
/// Our server doesn't depend on our Keeper cluster
return true;
}
catch (...)
{
return false;
}
}
UInt32 Context::getZooKeeperSessionUptime() const
{
std::lock_guard lock(shared->zookeeper_mutex);
@ -1833,7 +1895,7 @@ void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log);
}
void Context::initializeKeeperDispatcher() const
void Context::initializeKeeperDispatcher(bool start_async) const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_dispatcher_mutex);
@ -1844,8 +1906,21 @@ void Context::initializeKeeperDispatcher() const
const auto & config = getConfigRef();
if (config.has("keeper_server"))
{
bool is_standalone_app = getApplicationType() == ApplicationType::KEEPER;
if (start_async)
{
assert(!is_standalone_app);
LOG_INFO(shared->log, "Connected to ZooKeeper (or Keeper) before internal Keeper start or we don't depend on our Keeper cluster"
", will wait for Keeper asynchronously");
}
else
{
LOG_INFO(shared->log, "Cannot connect to ZooKeeper (or Keeper) before internal Keeper start,"
"will wait for Keeper synchronously");
}
shared->keeper_dispatcher = std::make_shared<KeeperDispatcher>();
shared->keeper_dispatcher->initialize(config, getApplicationType() == ApplicationType::KEEPER);
shared->keeper_dispatcher->initialize(config, is_standalone_app, start_async);
}
#endif
}

View File

@ -13,6 +13,7 @@
#include <Common/MultiVersion.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/RemoteHostFilter.h>
#include <Common/isLocalAddress.h>
#include <base/types.h>
#if !defined(ARCADIA_BUILD)
@ -664,12 +665,18 @@ public:
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
/// Try to connect to Keeper using get(Auxiliary)ZooKeeper. Useful for
/// internal Keeper start (check connection to some other node). Return true
/// if connected successfully (without exception) or our zookeeper client
/// connection configured for some other cluster without our node.
bool tryCheckClientConnectionToMyKeeperCluster() const;
UInt32 getZooKeeperSessionUptime() const;
#if USE_NURAFT
std::shared_ptr<KeeperDispatcher> & getKeeperDispatcher() const;
#endif
void initializeKeeperDispatcher() const;
void initializeKeeperDispatcher(bool start_async) const;
void shutdownKeeperDispatcher() const;
void updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config);

View File

@ -128,10 +128,10 @@ BlockIO InterpreterDescribeQuery::execute()
{
for (const auto & column : columns)
{
column.type->forEachSubcolumn([&](const auto & name, const auto & type, const auto & path)
IDataType::forEachSubcolumn([&](const auto & path, const auto & name, const auto & data)
{
res_columns[0]->insert(Nested::concatenateName(column.name, name));
res_columns[1]->insert(type->getName());
res_columns[1]->insert(data.type->getName());
/// It's not trivial to calculate default expression for subcolumn.
/// So, leave it empty.
@ -150,7 +150,7 @@ BlockIO InterpreterDescribeQuery::execute()
res_columns[6]->insertDefault();
res_columns[7]->insert(1u);
});
}, column.type->getDefaultSerialization(), column.type, nullptr);
}
}

View File

@ -18,7 +18,7 @@
#include <Parsers/parseQuery.h>
#include <IO/WriteHelpers.h>
#include <Core/Defines.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
@ -524,7 +524,8 @@ std::vector<TableNeededColumns> normalizeColumnNamesExtractNeeded(
size_t count = countTablesWithColumn(tables, short_name);
if (count > 1 || aliases.count(short_name))
/// isValidIdentifierBegin retuired to be consistent with TableJoin::deduplicateAndQualifyColumnNames
if (count > 1 || aliases.count(short_name) || !isValidIdentifierBegin(short_name.at(0)))
{
const auto & table = tables[*table_pos];
IdentifierSemantic::setColumnLongName(*ident, table.table); /// table.column -> table_alias.column

View File

@ -1,5 +1,4 @@
#include <Common/SquashingTransform.h>
#include <iostream>
#include <Interpreters/SquashingTransform.h>
namespace DB

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/Sinks/SinkToStorage.h>
#include <Common/SquashingTransform.h>
#include <Interpreters/SquashingTransform.h>
namespace DB
{

View File

@ -286,7 +286,7 @@ void KeeperTCPHandler::runImpl()
return;
}
if (keeper_dispatcher->hasLeader())
if (keeper_dispatcher->checkInit() && keeper_dispatcher->hasLeader())
{
try
{
@ -306,7 +306,8 @@ void KeeperTCPHandler::runImpl()
}
else
{
LOG_WARNING(log, "Ignoring user request, because no alive leader exist");
String reason = keeper_dispatcher->checkInit() ? "server is not initialized yet" : "no alive leader exists";
LOG_WARNING(log, "Ignoring user request, because {}", reason);
sendHandshake(false);
return;
}

View File

@ -35,8 +35,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
[&](const ISerialization::SubstreamPath & substream_path)
{
++stream_counts[ISerialization::getFileNameForStream(column, substream_path)];
},
{});
});
}
NameSet remove_files;

View File

@ -98,7 +98,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size;
}, {});
});
return size;
}

View File

@ -39,19 +39,21 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column, const ASTPtr & effective_codec_desc)
{
IDataType::StreamCallbackWithType callback = [&] (const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
ISerialization::StreamCallback callback = [&](const auto & substream_path)
{
assert(!substream_path.empty());
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
/// Shared offsets for Nested type.
if (compressed_streams.count(stream_name))
return;
const auto & subtype = substream_path.back().data.type;
CompressionCodecPtr compression_codec;
/// If we can use special codec than just get it
if (ISerialization::isSpecialCompressionAllowed(substream_path))
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, subtype.get(), default_codec);
else /// otherwise return only generic codecs and don't use info about data_type
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
@ -63,7 +65,8 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column,
compressed_streams.emplace(stream_name, stream);
};
column.type->enumerateStreams(serializations[column.name], callback);
ISerialization::SubstreamPath path;
serializations[column.name]->enumerateStreams(path, callback, column.type, nullptr);
}
namespace

View File

@ -90,17 +90,20 @@ void MergeTreeDataPartWriterWide::addStreams(
const NameAndTypePair & column,
const ASTPtr & effective_codec_desc)
{
IDataType::StreamCallbackWithType callback = [&] (const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
ISerialization::StreamCallback callback = [&](const auto & substream_path)
{
assert(!substream_path.empty());
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
/// Shared offsets for Nested type.
if (column_streams.count(stream_name))
return;
const auto & subtype = substream_path.back().data.type;
CompressionCodecPtr compression_codec;
/// If we can use special codec then just get it
if (ISerialization::isSpecialCompressionAllowed(substream_path))
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, subtype.get(), default_codec);
else /// otherwise return only generic codecs and don't use info about the` data_type
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
@ -113,7 +116,8 @@ void MergeTreeDataPartWriterWide::addStreams(
settings.max_compress_block_size);
};
column.type->enumerateStreams(serializations[column.name], callback);
ISerialization::SubstreamPath path;
serializations[column.name]->enumerateStreams(path, callback, column.type, nullptr);
}

View File

@ -220,7 +220,7 @@ void MergeTreeReaderCompact::readData(
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, state, nullptr);
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), *temp_column);
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
/// TODO: Avoid extra copying.
if (column->empty())

View File

@ -2,12 +2,12 @@
#include <base/logger_useful.h>
#include <Common/escapeForFileName.h>
#include <Parsers/queryToString.h>
#include <Interpreters/SquashingTransform.h>
#include <Processors/Transforms/TTLTransform.h>
#include <Processors/Transforms/TTLCalcTransform.h>
#include <Common/SquashingTransform.h>
#include <Processors/Transforms/DistinctSortedTransform.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
@ -337,15 +337,14 @@ static NameToNameVector collectFilesForRenames(
{
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std::map<String, size_t> stream_counts;
for (const NameAndTypePair & column : source_part->getColumns())
for (const auto & column : source_part->getColumns())
{
auto serialization = source_part->getSerializationForColumn(column);
serialization->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
++stream_counts[ISerialization::getFileNameForStream(column, substream_path)];
},
{});
});
}
NameToNameVector rename_vector;

View File

@ -141,10 +141,8 @@ IMergeTreeDataPart::Checksums checkDataPart(
[&](const ISerialization::SubstreamPath & substream_path)
{
String projection_file_name = ISerialization::getFileNameForStream(projection_column, substream_path) + ".bin";
checksums_data.files[projection_file_name]
= checksum_compressed_file(disk, projection_path + projection_file_name);
},
{});
checksums_data.files[projection_file_name] = checksum_compressed_file(disk, projection_path + projection_file_name);
});
}
}
@ -221,7 +219,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
{
String file_name = ISerialization::getFileNameForStream(column, substream_path) + ".bin";
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);
}, {});
});
}
}
else

View File

@ -406,7 +406,7 @@ void LogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & c
storage.files[stream_name].data_file_path,
columns.getCodecOrDefault(name_and_type.name),
storage.max_compress_block_size);
}, settings.path);
});
settings.getter = createStreamGetter(name_and_type, written_streams);
@ -427,7 +427,7 @@ void LogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & c
mark.offset = stream_it->second.plain_offset + stream_it->second.plain->count();
out_marks.emplace_back(file.column_index, mark);
}, settings.path);
});
serialization->serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
@ -441,7 +441,7 @@ void LogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & c
if (streams.end() == it)
throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);
it->second.compressed.next();
}, settings.path);
});
}
@ -627,13 +627,12 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta
* If this is a data type with multiple stream, get the first stream, that we assume have real row count.
* (Example: for Array data type, first stream is array sizes; and number of array sizes is the number of arrays).
*/
ISerialization::SubstreamPath substream_root_path;
auto serialization = column.type->getDefaultSerialization();
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (filename.empty())
filename = ISerialization::getFileNameForStream(column, substream_path);
}, substream_root_path);
});
Files::const_iterator it = files.find(filename);
if (files.end() == it)
@ -750,9 +749,8 @@ IStorage::ColumnSizeByName StorageLog::getColumnSizes() const
size.data_compressed += file_sizes[fileName(it->second.data_file_path)];
};
ISerialization::SubstreamPath substream_path;
auto serialization = column.type->getDefaultSerialization();
serialization->enumerateStreams(stream_callback, substream_path);
serialization->enumerateStreams(stream_callback);
}
return column_sizes;

View File

@ -448,9 +448,8 @@ void StorageTinyLog::addFiles(const NameAndTypePair & column)
}
};
ISerialization::SubstreamPath substream_path;
auto serialization = type->getDefaultSerialization();
serialization->enumerateStreams(stream_callback, substream_path);
serialization->enumerateStreams(stream_callback);
}
@ -544,9 +543,8 @@ IStorage::ColumnSizeByName StorageTinyLog::getColumnSizes() const
size.data_compressed += file_sizes[fileName(it->second.data_file_path)];
};
ISerialization::SubstreamPath substream_path;
auto serialization = column.type->getDefaultSerialization();
serialization->enumerateStreams(stream_callback, substream_path);
serialization->enumerateStreams(stream_callback);
}
return column_sizes;

View File

@ -1309,7 +1309,7 @@ if __name__ == '__main__':
parser.add_argument('-j', '--jobs', default=1, nargs='?', type=int, help='Run all tests in parallel')
parser.add_argument('--test-runs', default=1, nargs='?', type=int, help='Run each test many times (useful for e.g. flaky check)')
parser.add_argument('-U', '--unified', default=3, type=int, help='output NUM lines of unified context')
parser.add_argument('-r', '--server-check-retries', default=30, type=int, help='Num of tries to execute SELECT 1 before tests started')
parser.add_argument('-r', '--server-check-retries', default=90, type=int, help='Num of tries to execute SELECT 1 before tests started')
parser.add_argument('--db-engine', help='Database engine name')
parser.add_argument('--replicated-database', action='store_true', default=False, help='Run tests with Replicated database engine')
parser.add_argument('--fast-tests-only', action='store_true', default=False, help='Run only fast tests (the tests without the "no-fasttest" tag)')

View File

@ -2320,6 +2320,9 @@ class ClickHouseInstance:
def replace_config(self, path_to_config, replacement):
self.exec_in_container(["bash", "-c", "echo '{}' > {}".format(replacement, path_to_config)])
def replace_in_config(self, path_to_config, replace, replacement):
self.exec_in_container(["bash", "-c", f"sed -i 's/{replace}/{replacement}/g' {path_to_config}"])
def create_dir(self, destroy_dir=True):
"""Create the instance directory and all the needed files there."""

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>non_existing_node</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>non_existing_node</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml'], stay_alive=True)
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def test_smoke():
try:
cluster.start()
node1_zk = get_fake_zk("node1")
node1_zk.create("/test_alive", b"aaaa")
finally:
cluster.shutdown()

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,16 @@
<clickhouse>
<zookeeper>
<node index="1">
<host>node1</host>
<port>9181</port>
</node>
<node index="2">
<host>node2</host>
<port>9181</port>
</node>
<node index="3">
<host>node3</host>
<port>9181</port>
</node>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,90 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/keeper_conf.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/keeper_conf.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/keeper_conf.xml'], stay_alive=True)
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def start(node):
node.start_clickhouse()
def test_start_offline(started_cluster):
p = Pool(3)
try:
node1_zk = get_fake_zk("node1")
node1_zk.create("/test_alive", b"aaaa")
node1.stop_clickhouse()
node2.stop_clickhouse()
node3.stop_clickhouse()
time.sleep(3)
p.map(start, [node2, node3])
assert node2.contains_in_log("Cannot connect to ZooKeeper (or Keeper) before internal Keeper start")
assert node3.contains_in_log("Cannot connect to ZooKeeper (or Keeper) before internal Keeper start")
node2_zk = get_fake_zk("node2")
node2_zk.create("/test_dead", b"data")
finally:
p.map(start, [node1, node2, node3])
def test_start_non_existing(started_cluster):
p = Pool(3)
try:
node1.stop_clickhouse()
node2.stop_clickhouse()
node3.stop_clickhouse()
node1.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper1.xml', 'node3', 'non_existing_node')
node2.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper2.xml', 'node3', 'non_existing_node')
time.sleep(3)
p.map(start, [node2, node1])
assert node1.contains_in_log("Cannot connect to ZooKeeper (or Keeper) before internal Keeper start")
assert node2.contains_in_log("Cannot connect to ZooKeeper (or Keeper) before internal Keeper start")
node2_zk = get_fake_zk("node2")
node2_zk.create("/test_non_exising", b"data")
finally:
node1.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper1.xml', 'non_existing_node', 'node3')
node2.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper2.xml', 'non_existing_node', 'node3')
p.map(start, [node1, node2, node3])
def test_restart_third_node(started_cluster):
node1_zk = get_fake_zk("node1")
node1_zk.create("/test_restart", b"aaaa")
node3.restart_clickhouse()
assert node3.contains_in_log("Connected to ZooKeeper (or Keeper) before internal Keeper start")

View File

@ -1,2 +1,4 @@
1 hello 1 world world 1
2 hello 0 world 1
1 321 1 123 123 1
2 321 0 0 123 1

View File

@ -15,3 +15,21 @@ LEFT JOIN
arrayJoin([1, 3]) AS k,
'world'
) AS t2 ON t1.k = t2.k;
SELECT
t1.*,
t2.*,
123,
isConstant('world')
FROM
(
SELECT
arrayJoin([1, 2]) AS k,
321
) AS t1
LEFT JOIN
(
SELECT
arrayJoin([1, 3]) AS k,
123
) AS t2 ON t1.k = t2.k;

View File

@ -1,10 +1,6 @@
100 0 99 4950
100 0 99 4950
100 0 99 4950
100 0 99 4950
100 0 99 4950
100 0 99 4950
100 0 99 4950
100 0 99 4950
100 0 99 4950
100 0 99 4950
30 0 54 810
30 0 54 810
30 0 54 810
30 0 54 810
30 0 54 810
30 0 54 810

View File

@ -8,7 +8,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
NUM_REPLICAS=10
NUM_REPLICAS=6
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -n -q "
@ -20,17 +20,17 @@ done
function thread {
i=0 retries=300
while [[ $i -lt $retries ]]; do # server can be dead
$CLICKHOUSE_CLIENT --insert_quorum 5 --insert_quorum_parallel 1 --query "INSERT INTO r$1 SELECT $2" && break
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 1 --query "INSERT INTO r$1 SELECT $2" && break
((++i))
sleep 0.1
done
}
for i in $(seq 1 $NUM_REPLICAS); do
for j in {0..9}; do
for j in {0..4}; do
a=$((($i - 1) * 10 + $j))
# Note: making 100 connections simultaneously is a mini-DoS when server is build with sanitizers and CI environment is overloaded.
# Note: making 30 connections simultaneously is a mini-DoS when server is build with sanitizers and CI environment is overloaded.
# That's why we repeat "socket timeout" errors.
thread $i $a 2>&1 | grep -v -P 'SOCKET_TIMEOUT|NETWORK_ERROR|^$' &
done
@ -46,5 +46,5 @@ for i in $(seq 1 $NUM_REPLICAS); do
done
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS r$i;"
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS r$i SYNC;"
done

View File

@ -1,16 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --multiquery --query "DROP TABLE IF EXISTS t; CREATE TABLE t (x UInt64) ENGINE = Memory;"
${CLICKHOUSE_CLIENT} --query "SELECT number FROM numbers(1000)" > /dev/null
${CLICKHOUSE_CLIENT} --multiquery --query "SYSTEM FLUSH LOGS;
WITH ProfileEvents['NetworkSendBytes'] AS bytes
SELECT bytes >= 8000 AND bytes < 9000 ? 1 : bytes FROM system.query_log
WHERE current_database = currentDatabase() AND query_kind = 'Select' AND event_date >= yesterday() AND type = 2 ORDER BY event_time DESC LIMIT 1;"
${CLICKHOUSE_CLIENT} --query "DROP TABLE t"

View File

@ -29,7 +29,7 @@ populate_table_bg () {
$CLICKHOUSE_CLIENT --query "
INSERT INTO join_block_test
SELECT toString(number) as id, number * number as num
FROM system.numbers LIMIT 3000000
FROM system.numbers LIMIT 500000
" >/dev/null
) &
}

Some files were not shown because too many files have changed in this diff Show More