Merge branch 'master' into fix_int8_test_standalone_keeper_mode

This commit is contained in:
Igor Nikonov 2022-09-09 22:03:02 +02:00 committed by GitHub
commit 06070fe0ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 571 additions and 321 deletions

View File

@ -1,8 +1,15 @@
#!/bin/bash
# shellcheck disable=SC2086,SC2001,SC2046,SC2030,SC2031
set -eux
set -x
# core.COMM.PID-TID
sysctl kernel.core_pattern='core.%e.%p-%P'
set -e
set -u
set -o pipefail
trap "exit" INT TERM
# The watchdog is in the separate process group, so we have to kill it separately
# if the script terminates earlier.
@ -87,6 +94,19 @@ function configure
# TODO figure out which ones are needed
cp -av --dereference "$repo_dir"/tests/config/config.d/listen.xml db/config.d
cp -av --dereference "$script_dir"/query-fuzzer-tweaks-users.xml db/users.d
cat > db/config.d/core.xml <<EOL
<clickhouse>
<core_dump>
<!-- 100GiB -->
<size_limit>107374182400</size_limit>
</core_dump>
<!-- NOTE: no need to configure core_path,
since clickhouse is not started as daemon (via clickhouse start)
-->
<core_path>$PWD</core_path>
</clickhouse>
EOL
}
function watchdog
@ -180,7 +200,6 @@ handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
gcore
backtrace full
thread apply all backtrace full
info registers

View File

@ -8,6 +8,9 @@ dmesg --clear
set -x
# core.COMM.PID-TID
sysctl kernel.core_pattern='core.%e.%p-%P'
# Thread Fuzzer allows to check more permutations of possible thread scheduling
# and find more potential issues.
@ -104,6 +107,19 @@ EOL
</default>
</profiles>
</clickhouse>
EOL
cat > /etc/clickhouse-server/config.d/core.xml <<EOL
<clickhouse>
<core_dump>
<!-- 100GiB -->
<size_limit>107374182400</size_limit>
</core_dump>
<!-- NOTE: no need to configure core_path,
since clickhouse is not started as daemon (via clickhouse start)
-->
<core_path>$PWD</core_path>
</clickhouse>
EOL
}
@ -160,7 +176,6 @@ handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
gcore
backtrace full
thread apply all backtrace full
info registers
@ -504,8 +519,7 @@ done
clickhouse-local --structure "test String, res String" -q "SELECT 'failure', test FROM table WHERE res != 'OK' order by (lower(test) like '%hung%'), rowNumberInAllBlocks() LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv
[ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv
# Core dumps (see gcore)
# Default filename is 'core.PROCESS_ID'
# Core dumps
for core in core.*; do
pigz $core
mv $core.gz /test_output/

View File

@ -10,7 +10,7 @@ Makes the server "forget" about the existence of a table, a materialized view, o
**Syntax**
``` sql
DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY]
DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] [SYNC]
```
Detaching does not delete the data or metadata of a table, a materialized view or a dictionary. If an entity was not detached `PERMANENTLY`, on the next server launch the server will read the metadata and recall the table/view/dictionary again. If an entity was detached `PERMANENTLY`, there will be no automatic recall.
@ -24,6 +24,8 @@ Note that you can not detach permanently the table which is already detached (te
Also you can not [DROP](../../sql-reference/statements/drop#drop-table) the detached table, or [CREATE TABLE](../../sql-reference/statements/create/table.md) with the same name as detached permanently, or replace it with the other table with [RENAME TABLE](../../sql-reference/statements/rename.md) query.
The `SYNC` modifier executes the action without delay.
**Example**
Creating a table:

View File

@ -6,7 +6,7 @@ sidebar_label: DROP
# DROP Statements
Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist.
Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist. If the `SYNC` modifier is specified, the entity is dropped without delay.
## DROP DATABASE
@ -15,7 +15,7 @@ Deletes all tables inside the `db` database, then deletes the `db` database itse
Syntax:
``` sql
DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster]
DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster] [SYNC]
```
## DROP TABLE
@ -25,7 +25,7 @@ Deletes the table.
Syntax:
``` sql
DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster]
DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC]
```
## DROP DICTIONARY
@ -35,7 +35,7 @@ Deletes the dictionary.
Syntax:
``` sql
DROP DICTIONARY [IF EXISTS] [db.]name
DROP DICTIONARY [IF EXISTS] [db.]name [SYNC]
```
## DROP USER
@ -95,7 +95,7 @@ Deletes a view. Views can be deleted by a `DROP TABLE` command as well but `DROP
Syntax:
``` sql
DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster]
DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC]
```
## DROP FUNCTION

View File

@ -1,5 +1,5 @@
---
slug: /en/development/tests
slug: /zh/development/tests
sidebar_position: 70
sidebar_label: Testing
title: ClickHouse Testing

View File

@ -50,7 +50,7 @@ ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr &&
if (!offsets_concrete)
throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::LOGICAL_ERROR);
if (!offsets_concrete->empty() && data)
if (!offsets_concrete->empty() && data && !data->empty())
{
Offset last_offset = offsets_concrete->getData().back();

View File

@ -116,8 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
}
};
ISerialization::SubstreamPath path;
column_type->getDefaultSerialization()->enumerateStreams(path, callback, column_type);
auto serialization = column_type->getDefaultSerialization();
serialization->enumerateStreams(callback, column_type);
if (!result_codec)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName());

View File

@ -84,18 +84,20 @@ void IDataType::forEachSubcolumn(
{
for (size_t i = 0; i < subpath.size(); ++i)
{
if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, i + 1))
size_t prefix_len = i + 1;
if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, prefix_len))
{
auto name = ISerialization::getSubcolumnNameForStream(subpath, i + 1);
auto subdata = ISerialization::createFromPath(subpath, i);
auto name = ISerialization::getSubcolumnNameForStream(subpath, prefix_len);
auto subdata = ISerialization::createFromPath(subpath, prefix_len);
callback(subpath, name, subdata);
}
subpath[i].visited = true;
}
};
SubstreamPath path;
data.serialization->enumerateStreams(path, callback_with_data, data);
ISerialization::EnumerateStreamsSettings settings;
settings.position_independent_encoding = false;
data.serialization->enumerateStreams(settings, callback_with_data, data);
}
template <typename Ptr>
@ -118,33 +120,38 @@ Ptr IDataType::getForSubcolumn(
return res;
}
bool IDataType::hasSubcolumn(const String & subcolumn_name) const
{
return tryGetSubcolumnType(subcolumn_name) != nullptr;
}
DataTypePtr IDataType::tryGetSubcolumnType(const String & subcolumn_name) const
{
SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr };
auto data = SubstreamData(getDefaultSerialization()).withType(getPtr());
return getForSubcolumn<DataTypePtr>(subcolumn_name, data, &SubstreamData::type, false);
}
DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const
{
SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr };
auto data = SubstreamData(getDefaultSerialization()).withType(getPtr());
return getForSubcolumn<DataTypePtr>(subcolumn_name, data, &SubstreamData::type, true);
}
ColumnPtr IDataType::tryGetSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const
{
SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr };
auto data = SubstreamData(getDefaultSerialization()).withColumn(column);
return getForSubcolumn<ColumnPtr>(subcolumn_name, data, &SubstreamData::column, false);
}
ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const
{
SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr };
auto data = SubstreamData(getDefaultSerialization()).withColumn(column);
return getForSubcolumn<ColumnPtr>(subcolumn_name, data, &SubstreamData::column, true);
}
SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const
{
SubstreamData data = { serialization, nullptr, nullptr, nullptr };
auto data = SubstreamData(serialization);
return getForSubcolumn<SerializationPtr>(subcolumn_name, data, &SubstreamData::serialization, true);
}
@ -154,7 +161,7 @@ Names IDataType::getSubcolumnNames() const
forEachSubcolumn([&](const auto &, const auto & name, const auto &)
{
res.push_back(name);
}, { getDefaultSerialization(), nullptr, nullptr, nullptr });
}, SubstreamData(getDefaultSerialization()));
return res;
}

View File

@ -79,6 +79,8 @@ public:
/// Data type id. It's used for runtime type checks.
virtual TypeIndex getTypeId() const = 0;
bool hasSubcolumn(const String & subcolumn_name) const;
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const;
DataTypePtr getSubcolumnType(const String & subcolumn_name) const;

View File

@ -73,24 +73,24 @@ String ISerialization::SubstreamPath::toString() const
}
void ISerialization::enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const
{
path.push_back(Substream::Regular);
path.back().data = data;
callback(path);
path.pop_back();
settings.path.push_back(Substream::Regular);
settings.path.back().data = data;
callback(settings.path);
settings.path.pop_back();
}
void ISerialization::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
void ISerialization::enumerateStreams(
const StreamCallback & callback,
const DataTypePtr & type,
const ColumnPtr & column) const
{
enumerateStreams(path, callback, {getPtr(), nullptr, nullptr, nullptr});
}
void ISerialization::enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const
{
enumerateStreams(path, callback, {getPtr(), type, nullptr, nullptr});
EnumerateStreamsSettings settings;
auto data = SubstreamData(getPtr()).withType(type).withColumn(column);
enumerateStreams(settings, callback, data);
}
void ISerialization::serializeBinaryBulk(const IColumn & column, WriteBuffer &, size_t, size_t) const
@ -184,7 +184,7 @@ String ISerialization::getFileNameForStream(const NameAndTypePair & column, cons
return getFileNameForStream(column.getNameInStorage(), path);
}
static size_t isOffsetsOfNested(const ISerialization::SubstreamPath & path)
bool isOffsetsOfNested(const ISerialization::SubstreamPath & path)
{
if (path.empty())
return false;
@ -287,10 +287,13 @@ bool ISerialization::hasSubcolumnForPath(const SubstreamPath & path, size_t pref
ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len)
{
assert(prefix_len < path.size());
assert(prefix_len <= path.size());
if (prefix_len == 0)
return {};
SubstreamData res = path[prefix_len].data;
for (ssize_t i = static_cast<ssize_t>(prefix_len) - 1; i >= 0; --i)
ssize_t last_elem = prefix_len - 1;
auto res = path[last_elem].data;
for (ssize_t i = last_elem - 1; i >= 0; --i)
{
const auto & creator = path[i].creator;
if (creator)

View File

@ -101,6 +101,30 @@ public:
struct SubstreamData
{
SubstreamData() = default;
SubstreamData(SerializationPtr serialization_)
: serialization(std::move(serialization_))
{
}
SubstreamData & withType(DataTypePtr type_)
{
type = std::move(type_);
return *this;
}
SubstreamData & withColumn(ColumnPtr column_)
{
column = std::move(column_);
return *this;
}
SubstreamData & withSerializationInfo(SerializationInfoPtr serialization_info_)
{
serialization_info = std::move(serialization_info_);
return *this;
}
SerializationPtr serialization;
DataTypePtr type;
ColumnPtr column;
@ -164,16 +188,22 @@ public:
using StreamCallback = std::function<void(const SubstreamPath &)>;
struct EnumerateStreamsSettings
{
SubstreamPath path;
bool position_independent_encoding = true;
};
virtual void enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const;
void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); }
void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); }
void enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const;
/// Enumerate streams with default settings.
void enumerateStreams(
const StreamCallback & callback,
const DataTypePtr & type = nullptr,
const ColumnPtr & column = nullptr) const;
using OutputStreamGetter = std::function<WriteBuffer*(const SubstreamPath &)>;
using InputStreamGetter = std::function<ReadBuffer*(const SubstreamPath &)>;
@ -375,4 +405,6 @@ State * ISerialization::checkAndGetState(const StatePtr & state) const
return state_concrete;
}
bool isOffsetsOfNested(const ISerialization::SubstreamPath & path);
}

View File

@ -155,30 +155,30 @@ namespace
return column_offsets;
}
}
ColumnPtr arrayOffsetsToSizes(const IColumn & column)
{
const auto & column_offsets = assert_cast<const ColumnArray::ColumnOffsets &>(column);
MutableColumnPtr column_sizes = column_offsets.cloneEmpty();
if (column_offsets.empty())
return column_sizes;
const auto & offsets_data = column_offsets.getData();
auto & sizes_data = assert_cast<ColumnArray::ColumnOffsets &>(*column_sizes).getData();
sizes_data.resize(offsets_data.size());
IColumn::Offset prev_offset = 0;
for (size_t i = 0, size = offsets_data.size(); i < size; ++i)
ColumnPtr arrayOffsetsToSizes(const IColumn & column)
{
auto current_offset = offsets_data[i];
sizes_data[i] = current_offset - prev_offset;
prev_offset = current_offset;
}
const auto & column_offsets = assert_cast<const ColumnArray::ColumnOffsets &>(column);
MutableColumnPtr column_sizes = column_offsets.cloneEmpty();
return column_sizes;
if (column_offsets.empty())
return column_sizes;
const auto & offsets_data = column_offsets.getData();
auto & sizes_data = assert_cast<ColumnArray::ColumnOffsets &>(*column_sizes).getData();
sizes_data.resize(offsets_data.size());
IColumn::Offset prev_offset = 0;
for (size_t i = 0, size = offsets_data.size(); i < size; ++i)
{
auto current_offset = offsets_data[i];
sizes_data[i] = current_offset - prev_offset;
prev_offset = current_offset;
}
return column_sizes;
}
}
DataTypePtr SerializationArray::SubcolumnCreator::create(const DataTypePtr & prev) const
@ -197,41 +197,42 @@ ColumnPtr SerializationArray::SubcolumnCreator::create(const ColumnPtr & prev) c
}
void SerializationArray::enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const
{
const auto * type_array = data.type ? &assert_cast<const DataTypeArray &>(*data.type) : nullptr;
const auto * column_array = data.column ? &assert_cast<const ColumnArray &>(*data.column) : nullptr;
auto offsets_column = column_array ? column_array->getOffsetsPtr() : nullptr;
auto offsets = column_array ? column_array->getOffsetsPtr() : nullptr;
path.push_back(Substream::ArraySizes);
path.back().data =
{
auto offsets_serialization =
std::make_shared<SerializationNamed>(
std::make_shared<SerializationNumber<UInt64>>(),
"size" + std::to_string(getArrayLevel(path)), false),
data.type ? std::make_shared<DataTypeUInt64>() : nullptr,
offsets_column ? arrayOffsetsToSizes(*offsets_column) : nullptr,
data.serialization_info,
};
"size" + std::to_string(getArrayLevel(settings.path)), false);
callback(path);
auto offsets_column = offsets && !settings.position_independent_encoding
? arrayOffsetsToSizes(*offsets)
: offsets;
path.back() = Substream::ArrayElements;
path.back().data = data;
path.back().creator = std::make_shared<SubcolumnCreator>(offsets_column);
settings.path.push_back(Substream::ArraySizes);
settings.path.back().data = SubstreamData(offsets_serialization)
.withType(type_array ? std::make_shared<DataTypeUInt64>() : nullptr)
.withColumn(std::move(offsets_column))
.withSerializationInfo(data.serialization_info);
SubstreamData next_data =
{
nested,
type_array ? type_array->getNestedType() : nullptr,
column_array ? column_array->getDataPtr() : nullptr,
data.serialization_info,
};
callback(settings.path);
nested->enumerateStreams(path, callback, next_data);
path.pop_back();
settings.path.back() = Substream::ArrayElements;
settings.path.back().data = data;
settings.path.back().creator = std::make_shared<SubcolumnCreator>(offsets);
auto next_data = SubstreamData(nested)
.withType(type_array ? type_array->getNestedType() : nullptr)
.withColumn(column_array ? column_array->getDataPtr() : nullptr)
.withSerializationInfo(data.serialization_info);
nested->enumerateStreams(settings, callback, next_data);
settings.path.pop_back();
}
void SerializationArray::serializeBinaryBulkStatePrefix(

View File

@ -36,7 +36,7 @@ public:
*/
void enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const override;
@ -79,6 +79,4 @@ private:
};
};
ColumnPtr arrayOffsetsToSizes(const IColumn & column);
}

View File

@ -41,30 +41,26 @@ SerializationLowCardinality::SerializationLowCardinality(const DataTypePtr & dic
}
void SerializationLowCardinality::enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const
{
const auto * column_lc = data.column ? &getColumnLowCardinality(*data.column) : nullptr;
SubstreamData dict_data =
{
dict_inner_serialization,
data.type ? dictionary_type : nullptr,
column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr,
data.serialization_info,
};
settings.path.push_back(Substream::DictionaryKeys);
auto dict_data = SubstreamData(dict_inner_serialization)
.withType(data.type ? dictionary_type : nullptr)
.withColumn(column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr)
.withSerializationInfo(data.serialization_info);
path.push_back(Substream::DictionaryKeys);
path.back().data = dict_data;
settings.path.back().data = dict_data;
dict_inner_serialization->enumerateStreams(settings, callback, dict_data);
dict_inner_serialization->enumerateStreams(path, callback, dict_data);
settings.path.back() = Substream::DictionaryIndexes;
settings.path.back().data = data;
path.back() = Substream::DictionaryIndexes;
path.back().data = data;
callback(path);
path.pop_back();
callback(settings.path);
settings.path.pop_back();
}
struct KeysSerializationVersion

View File

@ -18,7 +18,7 @@ public:
explicit SerializationLowCardinality(const DataTypePtr & dictionary_type);
void enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const override;

View File

@ -257,19 +257,16 @@ void SerializationMap::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
}
void SerializationMap::enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const
{
SubstreamData next_data =
{
nested,
data.type ? assert_cast<const DataTypeMap &>(*data.type).getNestedType() : nullptr,
data.column ? assert_cast<const ColumnMap &>(*data.column).getNestedColumnPtr() : nullptr,
data.serialization_info,
};
auto next_data = SubstreamData(nested)
.withType(data.type ? assert_cast<const DataTypeMap &>(*data.type).getNestedType() : nullptr)
.withColumn(data.column ? assert_cast<const ColumnMap &>(*data.column).getNestedColumnPtr() : nullptr)
.withSerializationInfo(data.serialization_info);
nested->enumerateStreams(path, callback, next_data);
nested->enumerateStreams(settings, callback, next_data);
}
void SerializationMap::serializeBinaryBulkStatePrefix(

View File

@ -32,7 +32,7 @@ public:
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const override;

View File

@ -4,16 +4,16 @@ namespace DB
{
void SerializationNamed::enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const
{
addToPath(path);
path.back().data = data;
path.back().creator = std::make_shared<SubcolumnCreator>(name, escape_delimiter);
addToPath(settings.path);
settings.path.back().data = data;
settings.path.back().creator = std::make_shared<SubcolumnCreator>(name, escape_delimiter);
nested_serialization->enumerateStreams(path, callback, data);
path.pop_back();
nested_serialization->enumerateStreams(settings, callback, data);
settings.path.pop_back();
}
void SerializationNamed::serializeBinaryBulkStatePrefix(

View File

@ -26,7 +26,7 @@ public:
const String & getElementName() const { return name; }
void enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const override;

View File

@ -38,38 +38,35 @@ ColumnPtr SerializationNullable::SubcolumnCreator::create(const ColumnPtr & prev
}
void SerializationNullable::enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const
{
const auto * type_nullable = data.type ? &assert_cast<const DataTypeNullable &>(*data.type) : nullptr;
const auto * column_nullable = data.column ? &assert_cast<const ColumnNullable &>(*data.column) : nullptr;
path.push_back(Substream::NullMap);
path.back().data =
{
std::make_shared<SerializationNamed>(std::make_shared<SerializationNumber<UInt8>>(), "null", false),
type_nullable ? std::make_shared<DataTypeUInt8>() : nullptr,
column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr,
data.serialization_info,
};
auto null_map_serialization = std::make_shared<SerializationNamed>(std::make_shared<SerializationNumber<UInt8>>(), "null", false);
callback(path);
settings.path.push_back(Substream::NullMap);
auto null_map_data = SubstreamData(null_map_serialization)
.withType(type_nullable ? std::make_shared<DataTypeUInt8>() : nullptr)
.withColumn(column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr)
.withSerializationInfo(data.serialization_info);
path.back() = Substream::NullableElements;
path.back().creator = std::make_shared<SubcolumnCreator>(path.back().data.column);
path.back().data = data;
settings.path.back().data = null_map_data;
callback(settings.path);
SubstreamData next_data =
{
nested,
type_nullable ? type_nullable->getNestedType() : nullptr,
column_nullable ? column_nullable->getNestedColumnPtr() : nullptr,
data.serialization_info,
};
settings.path.back() = Substream::NullableElements;
settings.path.back().creator = std::make_shared<SubcolumnCreator>(null_map_data.column);
settings.path.back().data = data;
nested->enumerateStreams(path, callback, next_data);
path.pop_back();
auto next_data = SubstreamData(nested)
.withType(type_nullable ? type_nullable->getNestedType() : nullptr)
.withColumn(column_nullable ? column_nullable->getNestedColumnPtr() : nullptr)
.withSerializationInfo(data.serialization_info);
nested->enumerateStreams(settings, callback, next_data);
settings.path.pop_back();
}
void SerializationNullable::serializeBinaryBulkStatePrefix(

View File

@ -14,7 +14,7 @@ public:
explicit SerializationNullable(const SerializationPtr & nested_) : nested(nested_) {}
void enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const override;

View File

@ -148,39 +148,33 @@ ColumnPtr SerializationSparse::SubcolumnCreator::create(const ColumnPtr & prev)
}
void SerializationSparse::enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const
{
const auto * column_sparse = data.column ? &assert_cast<const ColumnSparse &>(*data.column) : nullptr;
size_t column_size = column_sparse ? column_sparse->size() : 0;
path.push_back(Substream::SparseOffsets);
path.back().data =
{
std::make_shared<SerializationNumber<UInt64>>(),
data.type ? std::make_shared<DataTypeUInt64>() : nullptr,
column_sparse ? column_sparse->getOffsetsPtr() : nullptr,
data.serialization_info,
};
settings.path.push_back(Substream::SparseOffsets);
auto offsets_data = SubstreamData(std::make_shared<SerializationNumber<UInt64>>())
.withType(data.type ? std::make_shared<DataTypeUInt64>() : nullptr)
.withColumn(column_sparse ? column_sparse->getOffsetsPtr() : nullptr)
.withSerializationInfo(data.serialization_info);
callback(path);
settings.path.back().data = offsets_data;
callback(settings.path);
path.back() = Substream::SparseElements;
path.back().creator = std::make_shared<SubcolumnCreator>(path.back().data.column, column_size);
path.back().data = data;
settings.path.back() = Substream::SparseElements;
settings.path.back().creator = std::make_shared<SubcolumnCreator>(offsets_data.column, column_size);
settings.path.back().data = data;
SubstreamData next_data =
{
nested,
data.type,
column_sparse ? column_sparse->getValuesPtr() : nullptr,
data.serialization_info,
};
auto next_data = SubstreamData(nested)
.withType(data.type)
.withColumn(column_sparse ? column_sparse->getValuesPtr() : nullptr)
.withSerializationInfo(data.serialization_info);
nested->enumerateStreams(path, callback, next_data);
path.pop_back();
nested->enumerateStreams(settings, callback, next_data);
settings.path.pop_back();
}
void SerializationSparse::serializeBinaryBulkStatePrefix(

View File

@ -28,7 +28,7 @@ public:
Kind getKind() const override { return Kind::SPARSE; }
virtual void enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const override;

View File

@ -283,7 +283,7 @@ void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
}
void SerializationTuple::enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const
{
@ -293,15 +293,12 @@ void SerializationTuple::enumerateStreams(
for (size_t i = 0; i < elems.size(); ++i)
{
SubstreamData next_data =
{
elems[i],
type_tuple ? type_tuple->getElement(i) : nullptr,
column_tuple ? column_tuple->getColumnPtr(i) : nullptr,
info_tuple ? info_tuple->getElementInfo(i) : nullptr,
};
auto next_data = SubstreamData(elems[i])
.withType(type_tuple ? type_tuple->getElement(i) : nullptr)
.withColumn(column_tuple ? column_tuple->getColumnPtr(i) : nullptr)
.withSerializationInfo(info_tuple ? info_tuple->getElementInfo(i) : nullptr);
elems[i]->enumerateStreams(path, callback, next_data);
elems[i]->enumerateStreams(settings, callback, next_data);
}
}

View File

@ -34,7 +34,7 @@ public:
/** Each sub-column in a tuple is serialized in separate stream.
*/
void enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const override;

View File

@ -5,11 +5,11 @@ namespace DB
{
void SerializationWrapper::enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const
{
nested_serialization->enumerateStreams(path, callback, data);
nested_serialization->enumerateStreams(settings, callback, data);
}
void SerializationWrapper::serializeBinaryBulkStatePrefix(

View File

@ -21,7 +21,7 @@ public:
Kind getKind() const override { return nested_serialization->getKind(); }
void enumerateStreams(
SubstreamPath & path,
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
const SubstreamData & data) const override;

View File

@ -163,7 +163,7 @@ BlockIO InterpreterDescribeQuery::execute()
res_columns[6]->insertDefault();
res_columns[7]->insert(1u);
}, { type->getDefaultSerialization(), type, nullptr, nullptr });
}, ISerialization::SubstreamData(type->getDefaultSerialization()).withType(type));
}
}

View File

@ -12,6 +12,7 @@
#include <Parsers/ASTFunction.h>
#include <utility>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/ObjectUtils.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Common/checkStackSize.h>
#include <Storages/ColumnsDescription.h>
@ -187,29 +188,56 @@ ActionsDAGPtr evaluateMissingDefaults(
return createExpressions(header, expr_list, save_unneeded_columns, context);
}
static bool arrayHasNoElementsRead(const IColumn & column)
static std::unordered_map<String, ColumnPtr> collectOffsetsColumns(
const NamesAndTypesList & available_columns, const Columns & res_columns)
{
const auto * column_array = typeid_cast<const ColumnArray *>(&column);
std::unordered_map<String, ColumnPtr> offsets_columns;
if (!column_array)
return false;
auto available_column = available_columns.begin();
for (size_t i = 0; i < available_columns.size(); ++i, ++available_column)
{
if (res_columns[i] == nullptr || isColumnConst(*res_columns[i]))
continue;
size_t size = column_array->size();
if (!size)
return false;
auto serialization = IDataType::getSerialization(*available_column);
serialization->enumerateStreams([&](const auto & subpath)
{
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
return;
size_t data_size = column_array->getData().size();
if (data_size)
return false;
auto stream_name = ISerialization::getFileNameForStream(*available_column, subpath);
const auto & current_offsets_column = subpath.back().data.column;
size_t last_offset = column_array->getOffsets()[size - 1];
return last_offset != 0;
/// If for some reason multiple offsets columns are present
/// for the same nested data structure, choose the one that is not empty.
if (current_offsets_column && !current_offsets_column->empty())
{
auto & offsets_column = offsets_columns[stream_name];
if (!offsets_column)
offsets_column = current_offsets_column;
#ifndef NDEBUG
const auto & offsets_data = assert_cast<const ColumnUInt64 &>(*offsets_column).getData();
const auto & current_offsets_data = assert_cast<const ColumnUInt64 &>(*current_offsets_column).getData();
if (offsets_data != current_offsets_data)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Found non-equal columns with offsets (sizes: {} and {}) for stream {}",
offsets_data.size(), current_offsets_data.size(), stream_name);
#endif
}
}, available_column->type, res_columns[i]);
}
return offsets_columns;
}
void fillMissingColumns(
Columns & res_columns,
size_t num_rows,
const NamesAndTypesList & requested_columns,
const NamesAndTypesList & available_columns,
const NameSet & partially_read_columns,
StorageMetadataPtr metadata_snapshot)
{
size_t num_columns = requested_columns.size();
@ -218,65 +246,79 @@ void fillMissingColumns(
"Invalid number of columns passed to fillMissingColumns. Expected {}, got {}",
num_columns, res_columns.size());
/// For a missing column of a nested data structure we must create not a column of empty
/// arrays, but a column of arrays of correct length.
/// For a missing column of a nested data structure
/// we must create not a column of empty arrays,
/// but a column of arrays of correct length.
/// First, collect offset columns for all arrays in the block.
auto offsets_columns = collectOffsetsColumns(available_columns, res_columns);
std::unordered_map<String, ColumnPtr> offset_columns;
/// Insert default values only for columns without default expressions.
auto requested_column = requested_columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++requested_column)
{
if (res_columns[i] == nullptr)
continue;
if (const auto * array = typeid_cast<const ColumnArray *>(res_columns[i].get()))
{
String offsets_name = Nested::extractTableName(requested_column->name);
auto & offsets_column = offset_columns[offsets_name];
/// If for some reason multiple offsets columns are present for the same nested data structure,
/// choose the one that is not empty.
if (!offsets_column || offsets_column->empty())
offsets_column = array->getOffsetsPtr();
}
}
/// insert default values only for columns without default expressions
requested_column = requested_columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++requested_column)
{
const auto & [name, type] = *requested_column;
if (res_columns[i] && arrayHasNoElementsRead(*res_columns[i]))
if (res_columns[i] && partially_read_columns.contains(name))
res_columns[i] = nullptr;
if (res_columns[i] == nullptr)
if (res_columns[i])
continue;
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name))
continue;
std::vector<ColumnPtr> current_offsets;
size_t num_dimensions = 0;
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
if (array_type && !offsets_columns.empty())
{
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name))
continue;
num_dimensions = getNumberOfDimensions(*array_type);
current_offsets.resize(num_dimensions);
String offsets_name = Nested::extractTableName(name);
auto offset_it = offset_columns.find(offsets_name);
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
if (offset_it != offset_columns.end() && array_type)
auto serialization = IDataType::getSerialization(*requested_column);
serialization->enumerateStreams([&](const auto & subpath)
{
const auto & nested_type = array_type->getNestedType();
ColumnPtr offsets_column = offset_it->second;
size_t nested_rows = typeid_cast<const ColumnUInt64 &>(*offsets_column).getData().back();
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
return;
ColumnPtr nested_column =
nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst();
size_t level = ISerialization::getArrayLevel(subpath);
assert(level < num_dimensions);
res_columns[i] = ColumnArray::create(nested_column, offsets_column);
}
else
auto stream_name = ISerialization::getFileNameForStream(*requested_column, subpath);
auto it = offsets_columns.find(stream_name);
if (it != offsets_columns.end())
current_offsets[level] = it->second;
});
for (size_t j = 0; j < num_dimensions; ++j)
{
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
if (!current_offsets[j])
{
current_offsets.resize(j);
break;
}
}
}
if (!current_offsets.empty())
{
size_t num_empty_dimensions = num_dimensions - current_offsets.size();
auto scalar_type = createArrayOfType(getBaseTypeOfArray(type), num_empty_dimensions);
size_t data_size = assert_cast<const ColumnUInt64 &>(*current_offsets.back()).getData().back();
res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst();
for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it)
res_columns[i] = ColumnArray::create(res_columns[i], *it);
}
else
{
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
}
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Core/Names.h>
#include <Interpreters/Context_fwd.h>
#include <Common/COW.h>
@ -43,6 +44,8 @@ void fillMissingColumns(
Columns & res_columns,
size_t num_rows,
const NamesAndTypesList & requested_columns,
const NamesAndTypesList & available_columns,
const NameSet & partially_read_columns,
StorageMetadataPtr metadata_snapshot);
}

View File

@ -780,7 +780,7 @@ void ColumnsDescription::addSubcolumns(const String & name_in_storage, const Dat
"Cannot add subcolumn {}: column with this name already exists", subcolumn.name);
subcolumns.get<0>().insert(std::move(subcolumn));
}, {type_in_storage->getDefaultSerialization(), type_in_storage, nullptr, nullptr});
}, ISerialization::SubstreamData(type_in_storage->getDefaultSerialization()).withType(type_in_storage));
}
void ColumnsDescription::removeSubcolumns(const String & name_in_storage)

View File

@ -445,11 +445,11 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const
column_name_to_position.clear();
column_name_to_position.reserve(new_columns.size());
size_t pos = 0;
for (const auto & column : columns)
column_name_to_position.emplace(column.name, pos++);
for (const auto & column : columns)
{
column_name_to_position.emplace(column.name, pos++);
auto it = serialization_infos.find(column.name);
auto serialization = it == serialization_infos.end()
? IDataType::getSerialization(column)
@ -461,7 +461,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const
{
auto full_name = Nested::concatenateName(column.name, subname);
serializations.emplace(full_name, subdata.serialization);
}, {serialization, nullptr, nullptr, nullptr});
}, ISerialization::SubstreamData(serialization));
}
columns_description = ColumnsDescription(columns);
@ -1352,7 +1352,6 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
}
}
void IMergeTreeDataPart::appendFilesOfColumns(Strings & files)
{
files.push_back("columns.txt");

View File

@ -63,7 +63,13 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
{
try
{
DB::fillMissingColumns(res_columns, num_rows, requested_columns, metadata_snapshot);
NamesAndTypesList available_columns(columns_to_read.begin(), columns_to_read.end());
DB::fillMissingColumns(
res_columns, num_rows,
Nested::convertToSubcolumns(requested_columns),
Nested::convertToSubcolumns(available_columns),
partially_read_columns, metadata_snapshot);
should_evaluate_missing_defaults = std::any_of(
res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; });
}
@ -201,20 +207,56 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
}
}
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
{
String table_name = Nested::extractTableName(column_name);
auto get_offsets_streams = [](const auto & serialization, const auto & name_in_storage)
{
Names offsets_streams;
serialization->enumerateStreams([&](const auto & subpath)
{
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
return;
auto subname = ISerialization::getSubcolumnNameForStream(subpath);
auto full_name = Nested::concatenateName(name_in_storage, subname);
offsets_streams.push_back(full_name);
});
return offsets_streams;
};
auto required_name_in_storage = Nested::extractTableName(required_column.getNameInStorage());
auto required_offsets_streams = get_offsets_streams(getSerializationInPart(required_column), required_name_in_storage);
size_t max_matched_streams = 0;
ColumnPosition position;
/// Find column that has maximal number of matching
/// offsets columns with required_column.
for (const auto & part_column : data_part_info_for_read->getColumns())
{
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
auto name_in_storage = Nested::extractTableName(part_column.name);
if (name_in_storage != required_name_in_storage)
continue;
auto offsets_streams = get_offsets_streams(data_part_info_for_read->getSerialization(part_column), name_in_storage);
NameSet offsets_streams_set(offsets_streams.begin(), offsets_streams.end());
size_t i = 0;
for (; i < required_offsets_streams.size(); ++i)
{
auto position = data_part_info_for_read->getColumnPosition(part_column.getNameInStorage());
if (position && Nested::extractTableName(part_column.name) == table_name)
return position;
if (!offsets_streams_set.contains(required_offsets_streams[i]))
break;
}
if (i && (!position || i > max_matched_streams))
{
max_matched_streams = i;
position = data_part_info_for_read->getColumnPosition(part_column.name);
}
}
return {};
return position;
}
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const

View File

@ -92,7 +92,9 @@ protected:
MarkRanges all_mark_ranges;
using ColumnPosition = std::optional<size_t>;
ColumnPosition findColumnForOffsets(const String & column_name) const;
ColumnPosition findColumnForOffsets(const NameAndTypePair & column) const;
NameSet partially_read_columns;
private:
/// Alter conversions, which must be applied on fly if required

View File

@ -66,8 +66,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column,
compressed_streams.emplace(stream_name, stream);
};
ISerialization::SubstreamPath path;
data_part->getSerialization(column.name)->enumerateStreams(path, callback, column.type);
data_part->getSerialization(column.name)->enumerateStreams(callback, column.type);
}
namespace

View File

@ -121,7 +121,7 @@ void MergeTreeDataPartWriterWide::addStreams(
};
ISerialization::SubstreamPath path;
data_part->getSerialization(column.name)->enumerateStreams(path, callback, column.type);
data_part->getSerialization(column.name)->enumerateStreams(callback, column.type);
}
@ -255,10 +255,9 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
void MergeTreeDataPartWriterWide::writeSingleMark(
const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns,
size_t number_of_rows,
ISerialization::SubstreamPath & path)
size_t number_of_rows)
{
StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns, path);
StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns);
for (const auto & mark : marks)
flushMarkToFile(mark, number_of_rows);
}
@ -274,8 +273,7 @@ void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stre
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns,
ISerialization::SubstreamPath & path)
WrittenOffsetColumns & offset_columns)
{
StreamsWithMarks result;
data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
@ -300,7 +298,7 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset();
result.push_back(stream_with_mark);
}, path);
});
return result;
}
@ -328,7 +326,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
return;
column_streams[stream_name]->compressed.nextIfAtEnd();
}, serialize_settings.path);
});
}
/// Column must not be empty. (column.size() !== 0)
@ -366,7 +364,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
{
if (last_non_written_marks.contains(name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {}", getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark);
last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns, serialize_settings.path);
last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns);
}
writeSingleGranule(
@ -390,7 +388,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
}
}
serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (is_offsets)
@ -398,7 +396,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
offset_columns.insert(stream_name);
}
}, serialize_settings.path);
});
}
@ -553,7 +551,7 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum
}
if (write_final_mark)
writeFinalMark(*it, offset_columns, serialize_settings.path);
writeFinalMark(*it, offset_columns);
}
}
@ -618,10 +616,9 @@ void MergeTreeDataPartWriterWide::finish(bool sync)
void MergeTreeDataPartWriterWide::writeFinalMark(
const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns,
ISerialization::SubstreamPath & path)
WrittenOffsetColumns & offset_columns)
{
writeSingleMark(column, offset_columns, 0, path);
writeSingleMark(column, offset_columns, 0);
/// Memoize information about offsets
data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
{
@ -631,7 +628,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark(
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
offset_columns.insert(stream_name);
}
}, path);
});
}
static void fillIndexGranularityImpl(

View File

@ -61,8 +61,7 @@ private:
/// Take offsets from column and return as MarkInCompressed file with stream name
StreamsWithMarks getCurrentMarksForColumn(
const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns,
ISerialization::SubstreamPath & path);
WrittenOffsetColumns & offset_columns);
/// Write mark to disk using stream and rows count
void flushMarkToFile(
@ -73,13 +72,11 @@ private:
void writeSingleMark(
const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns,
size_t number_of_rows,
ISerialization::SubstreamPath & path);
size_t number_of_rows);
void writeFinalMark(
const NameAndTypePair & column,
WrittenOffsetColumns & offset_columns,
ISerialization::SubstreamPath & path);
WrittenOffsetColumns & offset_columns);
void addStreams(
const NameAndTypePair & column,

View File

@ -46,35 +46,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{
try
{
size_t columns_num = columns_to_read.size();
column_positions.resize(columns_num);
read_only_offsets.resize(columns_num);
for (size_t i = 0; i < columns_num; ++i)
{
const auto & column_to_read = columns_to_read[i];
if (column_to_read.isSubcolumn())
{
auto storage_column_from_part = getColumnInPart(
{column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()});
if (!storage_column_from_part.type->tryGetSubcolumnType(column_to_read.getSubcolumnName()))
continue;
}
auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage());
if (!position && typeid_cast<const DataTypeArray *>(column_to_read.type.get()))
{
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
position = findColumnForOffsets(column_to_read.name);
read_only_offsets[i] = (position != std::nullopt);
}
column_positions[i] = std::move(position);
}
fillColumnPositions();
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(*data_part_info_for_read, marks_loader, column_positions, all_mark_ranges);
@ -137,6 +109,44 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
}
}
void MergeTreeReaderCompact::fillColumnPositions()
{
size_t columns_num = columns_to_read.size();
column_positions.resize(columns_num);
read_only_offsets.resize(columns_num);
for (size_t i = 0; i < columns_num; ++i)
{
const auto & column_to_read = columns_to_read[i];
auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage());
bool is_array = isArray(column_to_read.type);
if (column_to_read.isSubcolumn())
{
auto storage_column_from_part = getColumnInPart(
{column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()});
auto subcolumn_name = column_to_read.getSubcolumnName();
if (!storage_column_from_part.type->hasSubcolumn(subcolumn_name))
position.reset();
}
if (!position && is_array)
{
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
position = findColumnForOffsets(column_to_read);
read_only_offsets[i] = (position != std::nullopt);
}
column_positions[i] = std::move(position);
if (read_only_offsets[i])
partially_read_columns.insert(column_to_read.name);
}
}
size_t MergeTreeReaderCompact::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
@ -214,7 +224,8 @@ void MergeTreeReaderCompact::readData(
auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer *
{
if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes))
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (only_offsets && !is_offsets)
return nullptr;
return data_buffer;

View File

@ -39,6 +39,7 @@ public:
private:
bool isContinuousReading(size_t mark, size_t column_position);
void fillColumnPositions();
ReadBuffer * data_buffer;
CompressedReadBufferBase * compressed_data_buffer;

View File

@ -33,13 +33,19 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
{})
, part_in_memory(std::move(data_part_))
{
for (const auto & [name, type] : columns_to_read)
for (const auto & column_to_read : columns_to_read)
{
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
if (!part_in_memory->block.has(name) && typeid_cast<const DataTypeArray *>(type.get()))
if (auto offset_position = findColumnForOffsets(name))
positions_for_offsets[name] = *offset_position;
if (typeid_cast<const DataTypeArray *>(column_to_read.type.get())
&& !tryGetColumnFromBlock(part_in_memory->block, column_to_read))
{
if (auto offsets_position = findColumnForOffsets(column_to_read))
{
positions_for_offsets[column_to_read.name] = *offsets_position;
partially_read_columns.insert(column_to_read.name);
}
}
}
}

View File

@ -16,7 +16,6 @@ namespace DB
namespace
{
using OffsetColumns = std::map<std::string, ColumnPtr>;
constexpr auto DATA_FILE_EXTENSION = ".bin";
}
@ -160,12 +159,18 @@ void MergeTreeReaderWide::addStreams(
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
clockid_t clock_type)
{
bool has_any_stream = false;
bool has_all_streams = true;
ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
if (streams.contains(stream_name))
{
has_any_stream = true;
return;
}
bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION);
@ -173,8 +178,12 @@ void MergeTreeReaderWide::addStreams(
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
*/
if (!data_file_exists)
{
has_all_streams = false;
return;
}
has_any_stream = true;
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
@ -186,6 +195,9 @@ void MergeTreeReaderWide::addStreams(
};
serialization->enumerateStreams(callback);
if (has_any_stream && !has_all_streams)
partially_read_columns.insert(name_and_type.name);
}
@ -283,6 +295,7 @@ void MergeTreeReaderWide::readData(
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
seek_to_mark, current_task_last_mark, cache);
};
deserialize_settings.continuous_reading = continue_reading;
auto & deserialize_state = deserialize_binary_bulk_state_map[name_and_type.name];

View File

@ -95,7 +95,7 @@ protected:
++name_and_type;
}
fillMissingColumns(columns, src.rows(), column_names_and_types, /*metadata_snapshot=*/ nullptr);
fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, {}, nullptr);
assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; }));
return Chunk(std::move(columns), src.rows());

View File

@ -242,7 +242,7 @@ void StorageSystemPartsColumns::processNextStorage(
IDataType::forEachSubcolumn([&](const auto & subpath, const auto & name, const auto & data)
{
/// We count only final subcolumns, which are represented by files on disk
/// and skip intermediate suibcolumns of types Tuple and Nested.
/// and skip intermediate subcolumns of types Tuple and Nested.
if (isTuple(data.type) || isNested(data.type))
return;
@ -270,7 +270,7 @@ void StorageSystemPartsColumns::processNextStorage(
subcolumn_data_uncompressed_bytes.push_back(size.data_uncompressed);
subcolumn_marks_bytes.push_back(size.marks);
}, { serialization, column.type, nullptr, nullptr });
}, ISerialization::SubstreamData(serialization).withType(column.type));
if (columns_mask[src_index++])
columns[res_index++]->insert(subcolumn_names);

View File

@ -29,7 +29,11 @@ IMAGE_NAME = "clickhouse/fuzzer"
def get_run_command(pr_number, sha, download_url, workspace_path, image):
return (
f"docker run --network=host --volume={workspace_path}:/workspace "
f"docker run "
# For sysctl
"--privileged "
"--network=host "
f"--volume={workspace_path}:/workspace "
"--cap-add syslog --cap-add sys_admin --cap-add=SYS_PTRACE "
f'-e PR_TO_TEST={pr_number} -e SHA_TO_TEST={sha} -e BINARY_URL_TO_DOWNLOAD="{download_url}" '
f"{image}"

View File

@ -33,7 +33,7 @@ def get_run_command(
"docker run --cap-add=SYS_PTRACE "
# a static link, don't use S3_URL or S3_DOWNLOAD
"-e S3_URL='https://s3.amazonaws.com/clickhouse-datasets' "
# For dmesg
# For dmesg and sysctl
"--privileged "
f"--volume={build_path}:/package_folder "
f"--volume={result_folder}:/test_output "

View File

@ -0,0 +1,27 @@
Tuple(arr Nested(k1 Nested(k2 String, k3 String, k4 Int8), k5 Tuple(k6 String)), id Int8)
{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":"bbb","k4":0},{"k2":"ccc","k3":"","k4":0}],"k5":{"k6":""}}],"id":1}}
{"obj":{"arr":[{"k1":[{"k2":"","k3":"ddd","k4":10},{"k2":"","k3":"","k4":20}],"k5":{"k6":"foo"}}],"id":2}}
[['bbb','']] [['aaa','ccc']]
[['ddd','']] [['','']]
1
[[0,0]]
[[10,20]]
Tuple(arr Nested(k1 Nested(k2 String, k3 Nested(k4 Int8))), id Int8)
{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":[]}]}],"id":1}}
{"obj":{"arr":[{"k1":[{"k2":"bbb","k3":[{"k4":10}]},{"k2":"ccc","k3":[{"k4":20}]}]}],"id":2}}
[['aaa']] [[[]]]
[['bbb','ccc']] [[[10],[20]]]
1
[[[]]]
[[[10],[20]]]
Tuple(arr Nested(k1 Nested(k2 String, k4 Nested(k5 Int8)), k3 String), id Int8)
{"obj":{"arr":[{"k1":[],"k3":"qqq"},{"k1":[],"k3":"www"}],"id":1}}
{"obj":{"arr":[{"k1":[{"k2":"aaa","k4":[]}],"k3":"eee"}],"id":2}}
{"obj":{"arr":[{"k1":[{"k2":"bbb","k4":[{"k5":10}]},{"k2":"ccc","k4":[{"k5":20}]}],"k3":"rrr"}],"id":3}}
['qqq','www'] [[],[]] [[],[]]
['eee'] [['aaa']] [[[]]]
['rrr'] [['bbb','ccc']] [[[10],[20]]]
1
[[],[]]
[[[]]]
[[[10],[20]]]

View File

@ -0,0 +1,48 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_json_17;
SET allow_experimental_object_type = 1;
SET output_format_json_named_tuples_as_objects = 1;
CREATE TABLE t_json_17(obj JSON)
ENGINE = MergeTree ORDER BY tuple();
DROP FUNCTION IF EXISTS hasValidSizes17;
CREATE FUNCTION hasValidSizes17 AS (arr1, arr2) -> length(arr1) = length(arr2) AND arrayAll((x, y) -> length(x) = length(y), arr1, arr2);
SYSTEM STOP MERGES t_json_17;
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa", "k3": "bbb"}, {"k2": "ccc"}]}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k3": "ddd", "k4": 10}, {"k4": 20}], "k5": {"k6": "foo"}}]}
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
SELECT obj.arr.k1.k3, obj.arr.k1.k2 FROM t_json_17 ORDER BY obj.id;
SELECT sum(hasValidSizes17(obj.arr.k1.k3, obj.arr.k1.k2)) == count() FROM t_json_17;
SELECT obj.arr.k1.k4 FROM t_json_17 ORDER BY obj.id;
TRUNCATE TABLE t_json_17;
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa"}]}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "bbb", "k3": [{"k4": 10}]}, {"k2": "ccc", "k3": [{"k4": 20}]}]}]}
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
SELECT obj.arr.k1.k2, obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id;
SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k3.k4)) == count() FROM t_json_17;
SELECT obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id;
TRUNCATE TABLE t_json_17;
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k3": "qqq"}, {"k3": "www"}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "aaa"}], "k3": "eee"}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 3, "arr": [{"k1": [{"k2": "bbb", "k4": [{"k5": 10}]}, {"k2": "ccc", "k4": [{"k5": 20}]}], "k3": "rrr"}]}
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
SELECT obj.arr.k3, obj.arr.k1.k2, obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id;
SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k4.k5)) == count() FROM t_json_17;
SELECT obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id;
DROP FUNCTION hasValidSizes17;
DROP TABLE t_json_17;