Merge branch 'master' into fix-group-by-use-null-subquery-scope

This commit is contained in:
Nikolai Kochetov 2024-07-12 12:29:16 +02:00
commit eafd7bc4a0
37 changed files with 612 additions and 139 deletions

2
contrib/azure vendored

@ -1 +1 @@
Subproject commit 92c94d7f37a43cc8fc4d466884a95f610c0593bf
Subproject commit ea3e19a7be08519134c643177d56c7484dfec884

2
contrib/pocketfft vendored

@ -1 +1 @@
Subproject commit 9efd4da52cf8d28d14531d14e43ad9d913807546
Subproject commit f4c1aa8aa9ce79ad39e80f2c9c41b92ead90fda3

View File

@ -35,10 +35,9 @@ disable = '''
broad-except,
bare-except,
no-else-return,
global-statement
global-statement,
'''
[tool.pylint.SIMILARITIES]
# due to SQL
min-similarity-lines=1000

View File

@ -38,10 +38,19 @@ namespace ErrorCodes
extern const int CANNOT_MREMAP;
}
void abortOnFailedAssertion(const String & description, void * const * trace, size_t trace_offset, size_t trace_size)
{
auto & logger = Poco::Logger::root();
LOG_FATAL(&logger, "Logical error: '{}'.", description);
if (trace)
LOG_FATAL(&logger, "Stack trace (when copying this message, always include the lines below):\n\n{}", StackTrace::toString(trace, trace_offset, trace_size));
abort();
}
void abortOnFailedAssertion(const String & description)
{
LOG_FATAL(&Poco::Logger::root(), "Logical error: '{}'.", description);
abort();
StackTrace st;
abortOnFailedAssertion(description, st.getFramePointers().data(), st.getOffset(), st.getSize());
}
bool terminate_on_any_exception = false;
@ -58,7 +67,7 @@ void handle_error_code(const std::string & msg, int code, bool remote, const Exc
#ifdef ABORT_ON_LOGICAL_ERROR
if (code == ErrorCodes::LOGICAL_ERROR)
{
abortOnFailedAssertion(msg);
abortOnFailedAssertion(msg, trace.data(), 0, trace.size());
}
#endif

View File

@ -25,8 +25,6 @@ namespace DB
class AtomicLogger;
[[noreturn]] void abortOnFailedAssertion(const String & description);
/// This flag can be set for testing purposes - to check that no exceptions are thrown.
extern bool terminate_on_any_exception;
@ -167,6 +165,8 @@ protected:
mutable std::vector<StackTrace::FramePointers> capture_thread_frame_pointers;
};
[[noreturn]] void abortOnFailedAssertion(const String & description, void * const * trace, size_t trace_offset, size_t trace_size);
[[noreturn]] void abortOnFailedAssertion(const String & description);
std::string getExceptionStackTraceString(const std::exception & e);
std::string getExceptionStackTraceString(std::exception_ptr e);

View File

@ -235,7 +235,7 @@ bool NamedCollectionFactory::loadIfNot(std::lock_guard<std::mutex> & lock)
loadFromConfig(context->getConfigRef(), lock);
loadFromSQL(lock);
if (metadata_storage->supportsPeriodicUpdate())
if (metadata_storage->isReplicated())
{
update_task = context->getSchedulePool().createTask("NamedCollectionsMetadataStorage", [this]{ updateFunc(); });
update_task->activate();
@ -357,6 +357,13 @@ void NamedCollectionFactory::reloadFromSQL()
add(std::move(collections), lock);
}
bool NamedCollectionFactory::usesReplicatedStorage()
{
std::lock_guard lock(mutex);
loadIfNot(lock);
return metadata_storage->isReplicated();
}
void NamedCollectionFactory::updateFunc()
{
LOG_TRACE(log, "Named collections background updating thread started");

View File

@ -34,6 +34,8 @@ public:
void updateFromSQL(const ASTAlterNamedCollectionQuery & query);
bool usesReplicatedStorage();
void loadIfNot();
void shutdown();

View File

@ -67,7 +67,7 @@ public:
virtual bool removeIfExists(const std::string & path) = 0;
virtual bool supportsPeriodicUpdate() const = 0;
virtual bool isReplicated() const = 0;
virtual bool waitUpdate(size_t /* timeout */) { return false; }
};
@ -89,7 +89,7 @@ public:
~LocalStorage() override = default;
bool supportsPeriodicUpdate() const override { return false; }
bool isReplicated() const override { return false; }
std::vector<std::string> list() const override
{
@ -221,7 +221,7 @@ public:
~ZooKeeperStorage() override = default;
bool supportsPeriodicUpdate() const override { return true; }
bool isReplicated() const override { return true; }
/// Return true if children changed.
bool waitUpdate(size_t timeout) override
@ -465,14 +465,14 @@ void NamedCollectionsMetadataStorage::writeCreateQuery(const ASTCreateNamedColle
storage->write(getFileName(query.collection_name), serializeAST(*normalized_query), replace);
}
bool NamedCollectionsMetadataStorage::supportsPeriodicUpdate() const
bool NamedCollectionsMetadataStorage::isReplicated() const
{
return storage->supportsPeriodicUpdate();
return storage->isReplicated();
}
bool NamedCollectionsMetadataStorage::waitUpdate()
{
if (!storage->supportsPeriodicUpdate())
if (!storage->isReplicated())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Periodic updates are not supported");
const auto & config = Context::getGlobalContextInstance()->getConfigRef();

View File

@ -30,7 +30,7 @@ public:
/// Return true if update was made
bool waitUpdate();
bool supportsPeriodicUpdate() const;
bool isReplicated() const;
private:
class INamedCollectionsStorage;

View File

@ -545,7 +545,7 @@ std::string StackTrace::toString() const
return toStringCached(frame_pointers, offset, size);
}
std::string StackTrace::toString(void ** frame_pointers_raw, size_t offset, size_t size)
std::string StackTrace::toString(void * const * frame_pointers_raw, size_t offset, size_t size)
{
__msan_unpoison(frame_pointers_raw, size * sizeof(*frame_pointers_raw));

View File

@ -59,7 +59,7 @@ public:
const FramePointers & getFramePointers() const { return frame_pointers; }
std::string toString() const;
static std::string toString(void ** frame_pointers, size_t offset, size_t size);
static std::string toString(void * const * frame_pointers, size_t offset, size_t size);
static void dropCache();
/// @param fatal - if true, will process inline frames (slower)

View File

@ -346,6 +346,7 @@ class IColumn;
\
M(Bool, ignore_on_cluster_for_replicated_udf_queries, false, "Ignore ON CLUSTER clause for replicated UDF management queries.", 0) \
M(Bool, ignore_on_cluster_for_replicated_access_entities_queries, false, "Ignore ON CLUSTER clause for replicated access entities management queries.", 0) \
M(Bool, ignore_on_cluster_for_replicated_named_collections_queries, false, "Ignore ON CLUSTER clause for replicated named collections management queries.", 0) \
/** Settings for testing hedged requests */ \
M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \

View File

@ -76,6 +76,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"azure_sdk_max_retries", 10, 10, "Maximum number of retries in azure sdk"},
{"azure_sdk_retry_initial_backoff_ms", 10, 10, "Minimal backoff between retries in azure sdk"},
{"azure_sdk_retry_max_backoff_ms", 1000, 1000, "Maximal backoff between retries in azure sdk"},
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
}},

View File

@ -11,6 +11,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTTLElement.h>
#include <Poco/String.h>
@ -211,6 +212,13 @@ void DDLLoadingDependencyVisitor::extractTableNameFromArgument(const ASTFunction
qualified_name.database = table_identifier->getDatabaseName();
qualified_name.table = table_identifier->shortName();
}
else if (arg->as<ASTSubquery>())
{
/// Allow IN subquery.
/// Do not add tables from the subquery into dependencies,
/// because CREATE will succeed anyway.
return;
}
else
{
assert(false);

View File

@ -107,12 +107,24 @@ void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name,
StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String & name)
{
// it is important to call the destructors of not_in_use without
// locked mutex to avoid potential deadlock.
DetachedTables not_in_use;
std::lock_guard lock(mutex);
auto table = DatabaseOrdinary::detachTableUnlocked(name);
table_name_to_path.erase(name);
detached_tables.emplace(table->getStorageID().uuid, table);
not_in_use = cleanupDetachedTables();
StoragePtr table;
{
std::lock_guard lock(mutex);
table = DatabaseOrdinary::detachTableUnlocked(name);
table_name_to_path.erase(name);
detached_tables.emplace(table->getStorageID().uuid, table);
not_in_use = cleanupDetachedTables();
}
if (!not_in_use.empty())
{
not_in_use.clear();
LOG_DEBUG(log, "Finished removing not used detached tables");
}
return table;
}

View File

@ -4,6 +4,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
@ -13,14 +14,16 @@ namespace DB
BlockIO InterpreterAlterNamedCollectionQuery::execute()
{
auto current_context = getContext();
const auto & query = query_ptr->as<const ASTAlterNamedCollectionQuery &>();
const auto updated_query = removeOnClusterClauseIfNeeded(query_ptr, getContext());
const auto & query = updated_query->as<const ASTAlterNamedCollectionQuery &>();
current_context->checkAccess(AccessType::ALTER_NAMED_COLLECTION, query.collection_name);
if (!query.cluster.empty())
{
DDLQueryOnClusterParams params;
return executeDDLQueryOnCluster(query_ptr, current_context, params);
return executeDDLQueryOnCluster(updated_query, current_context, params);
}
NamedCollectionFactory::instance().updateFromSQL(query);

View File

@ -4,6 +4,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
@ -13,14 +14,16 @@ namespace DB
BlockIO InterpreterCreateNamedCollectionQuery::execute()
{
auto current_context = getContext();
const auto & query = query_ptr->as<const ASTCreateNamedCollectionQuery &>();
const auto updated_query = removeOnClusterClauseIfNeeded(query_ptr, getContext());
const auto & query = updated_query->as<const ASTCreateNamedCollectionQuery &>();
current_context->checkAccess(AccessType::CREATE_NAMED_COLLECTION, query.collection_name);
if (!query.cluster.empty())
{
DDLQueryOnClusterParams params;
return executeDDLQueryOnCluster(query_ptr, current_context, params);
return executeDDLQueryOnCluster(updated_query, current_context, params);
}
NamedCollectionFactory::instance().createFromSQL(query);

View File

@ -4,6 +4,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
@ -13,14 +14,16 @@ namespace DB
BlockIO InterpreterDropNamedCollectionQuery::execute()
{
auto current_context = getContext();
const auto & query = query_ptr->as<const ASTDropNamedCollectionQuery &>();
const auto updated_query = removeOnClusterClauseIfNeeded(query_ptr, getContext());
const auto & query = updated_query->as<const ASTDropNamedCollectionQuery &>();
current_context->checkAccess(AccessType::DROP_NAMED_COLLECTION, query.collection_name);
if (!query.cluster.empty())
{
DDLQueryOnClusterParams params;
return executeDDLQueryOnCluster(query_ptr, current_context, params);
return executeDDLQueryOnCluster(updated_query, current_context, params);
}
NamedCollectionFactory::instance().removeFromSQL(query);

View File

@ -15,6 +15,10 @@
#include <Parsers/Access/ASTCreateUserQuery.h>
#include <Parsers/Access/ASTDropAccessEntityQuery.h>
#include <Parsers/Access/ASTGrantQuery.h>
#include <Parsers/ASTCreateNamedCollectionQuery.h>
#include <Parsers/ASTAlterNamedCollectionQuery.h>
#include <Parsers/ASTDropNamedCollectionQuery.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
namespace DB
@ -38,6 +42,13 @@ static bool isAccessControlQuery(const ASTPtr & query)
|| query->as<ASTGrantQuery>();
}
static bool isNamedCollectionQuery(const ASTPtr & query)
{
return query->as<ASTCreateNamedCollectionQuery>()
|| query->as<ASTDropNamedCollectionQuery>()
|| query->as<ASTAlterNamedCollectionQuery>();
}
ASTPtr removeOnClusterClauseIfNeeded(const ASTPtr & query, ContextPtr context, const WithoutOnClusterASTRewriteParams & params)
{
auto * query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get());
@ -50,7 +61,10 @@ ASTPtr removeOnClusterClauseIfNeeded(const ASTPtr & query, ContextPtr context, c
&& context->getUserDefinedSQLObjectsStorage().isReplicated())
|| (isAccessControlQuery(query)
&& context->getSettings().ignore_on_cluster_for_replicated_access_entities_queries
&& context->getAccessControl().containsStorage(ReplicatedAccessStorage::STORAGE_TYPE)))
&& context->getAccessControl().containsStorage(ReplicatedAccessStorage::STORAGE_TYPE))
|| (isNamedCollectionQuery(query)
&& context->getSettings().ignore_on_cluster_for_replicated_named_collections_queries
&& NamedCollectionFactory::instance().usesReplicatedStorage()))
{
LOG_DEBUG(getLogger("removeOnClusterClauseIfNeeded"), "ON CLUSTER clause was ignored for query {}", query->getID());
return query_on_cluster->getRewrittenASTWithoutOnCluster(params);

View File

@ -8,6 +8,7 @@
#include <Common/logger_useful.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnArray.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromFileBase.h>
@ -30,6 +31,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/NestedUtils.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <parquet/file_reader.h>
@ -111,7 +113,7 @@ struct DeltaLakeMetadataImpl
std::set<String> result_files;
NamesAndTypesList current_schema;
DataLakePartitionColumns current_partition_columns;
const auto checkpoint_version = getCheckpointIfExists(result_files);
const auto checkpoint_version = getCheckpointIfExists(result_files, current_schema, current_partition_columns);
if (checkpoint_version)
{
@ -205,9 +207,32 @@ struct DeltaLakeMetadataImpl
Poco::Dynamic::Var json = parser.parse(json_str);
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
// std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
// object->stringify(oss);
// LOG_TEST(log, "Metadata: {}", oss.str());
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
object->stringify(oss);
LOG_TEST(log, "Metadata: {}", oss.str());
if (object->has("metaData"))
{
const auto metadata_object = object->get("metaData").extract<Poco::JSON::Object::Ptr>();
const auto schema_object = metadata_object->getValue<String>("schemaString");
Poco::JSON::Parser p;
Poco::Dynamic::Var fields_json = parser.parse(schema_object);
const Poco::JSON::Object::Ptr & fields_object = fields_json.extract<Poco::JSON::Object::Ptr>();
auto current_schema = parseMetadata(fields_object);
if (file_schema.empty())
{
file_schema = current_schema;
}
else if (file_schema != current_schema)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading from files with different schema is not possible "
"({} is different from {})",
file_schema.toString(), current_schema.toString());
}
}
if (object->has("add"))
{
@ -230,7 +255,12 @@ struct DeltaLakeMetadataImpl
const auto value = partition_values->getValue<String>(partition_name);
auto name_and_type = file_schema.tryGetByName(partition_name);
if (!name_and_type)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such column in schema: {}", partition_name);
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"No such column in schema: {} (schema: {})",
partition_name, file_schema.toNamesAndTypesDescription());
}
auto field = getFieldValue(value, name_and_type->type);
current_partition_columns.emplace_back(*name_and_type, field);
@ -246,52 +276,35 @@ struct DeltaLakeMetadataImpl
auto path = object->get("remove").extract<Poco::JSON::Object::Ptr>()->getValue<String>("path");
result.erase(fs::path(configuration->getPath()) / path);
}
if (object->has("metaData"))
{
const auto metadata_object = object->get("metaData").extract<Poco::JSON::Object::Ptr>();
const auto schema_object = metadata_object->getValue<String>("schemaString");
Poco::JSON::Parser p;
Poco::Dynamic::Var fields_json = parser.parse(schema_object);
Poco::JSON::Object::Ptr fields_object = fields_json.extract<Poco::JSON::Object::Ptr>();
const auto fields = fields_object->get("fields").extract<Poco::JSON::Array::Ptr>();
NamesAndTypesList current_schema;
for (size_t i = 0; i < fields->size(); ++i)
{
const auto field = fields->getObject(static_cast<UInt32>(i));
auto column_name = field->getValue<String>("name");
auto type = field->getValue<String>("type");
auto is_nullable = field->getValue<bool>("nullable");
std::string physical_name;
auto schema_metadata_object = field->get("metadata").extract<Poco::JSON::Object::Ptr>();
if (schema_metadata_object->has("delta.columnMapping.physicalName"))
physical_name = schema_metadata_object->getValue<String>("delta.columnMapping.physicalName");
else
physical_name = column_name;
LOG_TEST(log, "Found column: {}, type: {}, nullable: {}, physical name: {}",
column_name, type, is_nullable, physical_name);
current_schema.push_back({physical_name, getFieldType(field, "type", is_nullable)});
}
if (file_schema.empty())
{
file_schema = current_schema;
}
else if (file_schema != current_schema)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading from files with different schema is not possible "
"({} is different from {})",
file_schema.toString(), current_schema.toString());
}
}
}
}
NamesAndTypesList parseMetadata(const Poco::JSON::Object::Ptr & metadata_json)
{
NamesAndTypesList schema;
const auto fields = metadata_json->get("fields").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i < fields->size(); ++i)
{
const auto field = fields->getObject(static_cast<UInt32>(i));
auto column_name = field->getValue<String>("name");
auto type = field->getValue<String>("type");
auto is_nullable = field->getValue<bool>("nullable");
std::string physical_name;
auto schema_metadata_object = field->get("metadata").extract<Poco::JSON::Object::Ptr>();
if (schema_metadata_object->has("delta.columnMapping.physicalName"))
physical_name = schema_metadata_object->getValue<String>("delta.columnMapping.physicalName");
else
physical_name = column_name;
LOG_TEST(log, "Found column: {}, type: {}, nullable: {}, physical name: {}",
column_name, type, is_nullable, physical_name);
schema.push_back({physical_name, getFieldType(field, "type", is_nullable)});
}
return schema;
}
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool is_nullable)
{
if (field->isObject(type_key))
@ -505,7 +518,10 @@ struct DeltaLakeMetadataImpl
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Arrow error: {}", _s.ToString()); \
} while (false)
size_t getCheckpointIfExists(std::set<String> & result)
size_t getCheckpointIfExists(
std::set<String> & result,
NamesAndTypesList & file_schema,
DataLakePartitionColumns & file_partition_columns)
{
const auto version = readLastCheckpointIfExists();
if (!version)
@ -526,7 +542,8 @@ struct DeltaLakeMetadataImpl
auto columns = ParquetSchemaReader(*buf, format_settings).readSchema();
/// Read only columns that we need.
columns.filterColumns(NameSet{"add", "remove"});
auto filter_column_names = NameSet{"add", "metaData"};
columns.filterColumns(filter_column_names);
Block header;
for (const auto & column : columns)
header.insert({column.type->createColumn(), column.type, column.name});
@ -540,9 +557,6 @@ struct DeltaLakeMetadataImpl
ArrowMemoryPool::instance(),
&reader));
std::shared_ptr<arrow::Schema> file_schema;
THROW_ARROW_NOT_OK(reader->GetSchema(&file_schema));
ArrowColumnToCHColumn column_reader(
header, "Parquet",
format_settings.parquet.allow_missing_columns,
@ -553,29 +567,85 @@ struct DeltaLakeMetadataImpl
std::shared_ptr<arrow::Table> table;
THROW_ARROW_NOT_OK(reader->ReadTable(&table));
Chunk res = column_reader.arrowTableToCHChunk(table, reader->parquet_reader()->metadata()->num_rows());
const auto & res_columns = res.getColumns();
Chunk chunk = column_reader.arrowTableToCHChunk(table, reader->parquet_reader()->metadata()->num_rows());
auto res_block = header.cloneWithColumns(chunk.detachColumns());
res_block = Nested::flatten(res_block);
if (res_columns.size() != 2)
{
throw Exception(
ErrorCodes::INCORRECT_DATA,
"Unexpected number of columns: {} (having: {}, expected: {})",
res_columns.size(), res.dumpStructure(), header.dumpStructure());
}
const auto * nullable_path_column = assert_cast<const ColumnNullable *>(res_block.getByName("add.path").column.get());
const auto & path_column = assert_cast<const ColumnString &>(nullable_path_column->getNestedColumn());
const auto * nullable_schema_column = assert_cast<const ColumnNullable *>(res_block.getByName("metaData.schemaString").column.get());
const auto & schema_column = assert_cast<const ColumnString &>(nullable_schema_column->getNestedColumn());
auto partition_values_column_raw = res_block.getByName("add.partitionValues").column;
const auto & partition_values_column = assert_cast<const ColumnMap &>(*partition_values_column_raw);
const auto * tuple_column = assert_cast<const ColumnTuple *>(res_columns[0].get());
const auto & nullable_column = assert_cast<const ColumnNullable &>(tuple_column->getColumn(0));
const auto & path_column = assert_cast<const ColumnString &>(nullable_column.getNestedColumn());
for (size_t i = 0; i < path_column.size(); ++i)
{
const auto filename = String(path_column.getDataAt(i));
if (filename.empty())
const auto metadata = String(schema_column.getDataAt(i));
if (!metadata.empty())
{
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(metadata);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
auto current_schema = parseMetadata(object);
if (file_schema.empty())
{
file_schema = current_schema;
LOG_TEST(log, "Processed schema from checkpoint: {}", file_schema.toString());
}
else if (file_schema != current_schema)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading from files with different schema is not possible "
"({} is different from {})",
file_schema.toString(), current_schema.toString());
}
}
}
for (size_t i = 0; i < path_column.size(); ++i)
{
const auto path = String(path_column.getDataAt(i));
if (path.empty())
continue;
LOG_TEST(log, "Adding {}", filename);
const auto [_, inserted] = result.insert(std::filesystem::path(configuration->getPath()) / filename);
auto filename = fs::path(path).filename().string();
auto it = file_partition_columns.find(filename);
if (it == file_partition_columns.end())
{
Field map;
partition_values_column.get(i, map);
auto partition_values_map = map.safeGet<Map>();
if (!partition_values_map.empty())
{
auto & current_partition_columns = file_partition_columns[filename];
for (const auto & map_value : partition_values_map)
{
const auto tuple = map_value.safeGet<Tuple>();
const auto partition_name = tuple[0].safeGet<String>();
auto name_and_type = file_schema.tryGetByName(partition_name);
if (!name_and_type)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"No such column in schema: {} (schema: {})",
partition_name, file_schema.toString());
}
const auto value = tuple[1].safeGet<String>();
auto field = getFieldValue(value, name_and_type->type);
current_partition_columns.emplace_back(std::move(name_and_type.value()), std::move(field));
LOG_TEST(log, "Partition {} value is {} (for {})", partition_name, value, filename);
}
}
}
LOG_TEST(log, "Adding {}", path);
const auto [_, inserted] = result.insert(std::filesystem::path(configuration->getPath()) / path);
if (!inserted)
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", filename);
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path);
}
return version;

View File

@ -41,6 +41,7 @@ public:
auto object_storage = base_configuration->createObjectStorage(context, /* is_readonly */true);
DataLakeMetadataPtr metadata;
NamesAndTypesList schema_from_metadata;
const bool use_schema_from_metadata = columns_.empty();
if (base_configuration->format == "auto")
base_configuration->format = "Parquet";
@ -50,8 +51,9 @@ public:
try
{
metadata = DataLakeMetadata::create(object_storage, base_configuration, context);
schema_from_metadata = metadata->getTableSchema();
configuration->setPaths(metadata->getDataFiles());
if (use_schema_from_metadata)
schema_from_metadata = metadata->getTableSchema();
}
catch (...)
{
@ -66,7 +68,7 @@ public:
return std::make_shared<IStorageDataLake<DataLakeMetadata>>(
base_configuration, std::move(metadata), configuration, object_storage,
context, table_id_,
columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_,
use_schema_from_metadata ? ColumnsDescription(schema_from_metadata) : columns_,
constraints_, comment_, format_settings_);
}

View File

@ -206,23 +206,25 @@ Chunk StorageObjectStorageSource::generate()
if (!partition_columns.empty() && chunk_size && chunk.hasColumns())
{
auto partition_values = partition_columns.find(filename);
for (const auto & [name_and_type, value] : partition_values->second)
if (partition_values != partition_columns.end())
{
if (!read_from_format_info.source_header.has(name_and_type.name))
continue;
for (const auto & [name_and_type, value] : partition_values->second)
{
if (!read_from_format_info.source_header.has(name_and_type.name))
continue;
const auto column_pos = read_from_format_info.source_header.getPositionByName(name_and_type.name);
auto partition_column = name_and_type.type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst();
const auto column_pos = read_from_format_info.source_header.getPositionByName(name_and_type.name);
auto partition_column = name_and_type.type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst();
/// This column is filled with default value now, remove it.
chunk.erase(column_pos);
/// This column is filled with default value now, remove it.
chunk.erase(column_pos);
/// Add correct values.
if (chunk.hasColumns())
chunk.addColumn(column_pos, std::move(partition_column));
else
chunk.addColumn(std::move(partition_column));
/// Add correct values.
if (column_pos < chunk.getNumColumns())
chunk.addColumn(column_pos, std::move(partition_column));
else
chunk.addColumn(std::move(partition_column));
}
}
}
return chunk;

View File

@ -5,20 +5,21 @@
#include <base/hex.h>
#include <base/interpolate.h>
#include <Common/FailPoint.h>
#include <Common/Macros.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEventsScope.h>
#include <Common/StringUtils.h>
#include <Common/ThreadFuzzer.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/escapeForFileName.h>
#include <Common/formatReadable.h>
#include <Common/logger_useful.h>
#include <Common/noexcept_scope.h>
#include <Common/randomDelay.h>
#include <Common/thread_local_rng.h>
#include <Common/typeid_cast.h>
#include <Common/ThreadFuzzer.h>
#include <Common/FailPoint.h>
#include <Common/randomDelay.h>
#include <Core/ServerUUID.h>
@ -5272,6 +5273,8 @@ void StorageReplicatedMergeTree::flushAndPrepareForShutdown()
if (shutdown_prepared_called.exchange(true))
return;
LOG_TRACE(log, "Start preparing for shutdown");
try
{
auto settings_ptr = getSettings();
@ -5282,7 +5285,11 @@ void StorageReplicatedMergeTree::flushAndPrepareForShutdown()
stopBeingLeader();
if (attach_thread)
{
attach_thread->shutdown();
LOG_TRACE(log, "The attach thread is shutdown");
}
restarting_thread.shutdown(/* part_of_full_shutdown */true);
/// Explicitly set the event, because the restarting thread will not set it again
@ -5295,6 +5302,8 @@ void StorageReplicatedMergeTree::flushAndPrepareForShutdown()
shutdown_deadline.emplace(std::chrono::system_clock::now());
throw;
}
LOG_TRACE(log, "Finished preparing for shutdown");
}
void StorageReplicatedMergeTree::partialShutdown()
@ -5332,6 +5341,8 @@ void StorageReplicatedMergeTree::shutdown(bool)
if (shutdown_called.exchange(true))
return;
LOG_TRACE(log, "Shutdown started");
flushAndPrepareForShutdown();
if (!shutdown_deadline.has_value())
@ -5374,6 +5385,7 @@ void StorageReplicatedMergeTree::shutdown(bool)
/// Wait for all of them
std::lock_guard lock(data_parts_exchange_ptr->rwlock);
}
LOG_TRACE(log, "Shutdown finished");
}

View File

@ -35,7 +35,6 @@ void registerStorageFuzzJSON(StorageFactory & factory);
void registerStorageS3(StorageFactory & factory);
void registerStorageHudi(StorageFactory & factory);
void registerStorageS3Queue(StorageFactory & factory);
void registerStorageAzureQueue(StorageFactory & factory);
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory);
@ -45,6 +44,10 @@ void registerStorageIceberg(StorageFactory & factory);
#endif
#endif
#if USE_AZURE_BLOB_STORAGE
void registerStorageAzureQueue(StorageFactory & factory);
#endif
#if USE_HDFS
#if USE_HIVE
void registerStorageHive(StorageFactory & factory);

View File

@ -15,3 +15,4 @@ warn_return_any = True
no_implicit_reexport = True
strict_equality = True
extra_checks = True
ignore_missing_imports = True

View File

@ -15,7 +15,7 @@ import upload_result_helper
from build_check import get_release_or_pr
from ci_config import CI
from ci_metadata import CiMetadata
from ci_utils import GHActions, normalize_string
from ci_utils import GHActions, normalize_string, Shell
from clickhouse_helper import (
CiLogsCredentials,
ClickHouseHelper,
@ -53,6 +53,7 @@ from stopwatch import Stopwatch
from tee_popen import TeePopen
from ci_cache import CiCache
from ci_settings import CiSettings
from ci_buddy import CIBuddy
from version_helper import get_version_from_repo
# pylint: disable=too-many-lines
@ -262,6 +263,8 @@ def check_missing_images_on_dockerhub(
def _pre_action(s3, indata, pr_info):
print("Clear dmesg")
Shell.run("sudo dmesg --clear ||:")
CommitStatusData.cleanup()
JobReport.cleanup()
BuildResult.cleanup()
@ -1118,6 +1121,14 @@ def main() -> int:
### POST action: start
elif args.post:
if Shell.check(
"sudo dmesg -T | grep -q -e 'Out of memory: Killed process' -e 'oom_reaper: reaped process' -e 'oom-kill:constraint=CONSTRAINT_NONE'"
):
print("WARNING: OOM while job execution")
CIBuddy(dry_run=not pr_info.is_release).post_error(
"Out Of Memory", job_name=_get_ext_check_name(args.job_name)
)
job_report = JobReport.load() if JobReport.exist() else None
if job_report:
ch_helper = ClickHouseHelper()

88
tests/ci/ci_buddy.py Normal file
View File

@ -0,0 +1,88 @@
import json
import os
import boto3
import requests
from botocore.exceptions import ClientError
from pr_info import PRInfo
from ci_utils import Shell
class CIBuddy:
_HEADERS = {"Content-Type": "application/json"}
def __init__(self, dry_run=False):
self.repo = os.getenv("GITHUB_REPOSITORY", "")
self.dry_run = dry_run
res = self._get_webhooks()
self.test_channel = ""
self.dev_ci_channel = ""
if res:
self.test_channel = json.loads(res)["test_channel"]
self.dev_ci_channel = json.loads(res)["ci_channel"]
self.job_name = os.getenv("CHECK_NAME", "unknown")
pr_info = PRInfo()
self.pr_number = pr_info.number
self.head_ref = pr_info.head_ref
self.commit_url = pr_info.commit_html_url
@staticmethod
def _get_webhooks():
name = "ci_buddy_web_hooks"
session = boto3.Session(region_name="us-east-1") # Replace with your region
ssm_client = session.client("ssm")
json_string = None
try:
response = ssm_client.get_parameter(
Name=name,
WithDecryption=True, # Set to True if the parameter is a SecureString
)
json_string = response["Parameter"]["Value"]
except ClientError as e:
print(f"An error occurred: {e}")
return json_string
def post(self, message, dry_run=None):
if dry_run is None:
dry_run = self.dry_run
print(f"Posting slack message, dry_run [{dry_run}]")
if dry_run:
url = self.test_channel
else:
url = self.dev_ci_channel
data = {"text": message}
try:
requests.post(url, headers=self._HEADERS, data=json.dumps(data), timeout=10)
except Exception as e:
print(f"ERROR: Failed to post message, ex {e}")
def post_error(self, error_description, job_name="", with_instance_info=True):
instance_id, instance_type = "unknown", "unknown"
if with_instance_info:
instance_id = Shell.run("ec2metadata --instance-id") or instance_id
instance_type = Shell.run("ec2metadata --instance-type") or instance_type
if not job_name:
job_name = os.getenv("CHECK_NAME", "unknown")
line_err = f":red_circle: *Error: {error_description}*\n\n"
line_ghr = f" *Runner:* `{instance_type}`, `{instance_id}`\n"
line_job = f" *Job:* `{job_name}`\n"
line_pr_ = f" *PR:* <https://github.com/{self.repo}/pull/{self.pr_number}|#{self.pr_number}>\n"
line_br_ = f" *Branch:* `{self.head_ref}`, <{self.commit_url}|commit>\n"
message = line_err
message += line_job
if with_instance_info:
message += line_ghr
if self.pr_number > 0:
message += line_pr_
else:
message += line_br_
self.post(message)
if __name__ == "__main__":
# test
buddy = CIBuddy(dry_run=True)
buddy.post_error("Out of memory")

View File

@ -1,4 +1,5 @@
import os
import subprocess
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator, List, Union
@ -42,3 +43,43 @@ class GHActions:
for line in lines:
print(line)
print("::endgroup::")
class Shell:
@classmethod
def run_strict(cls, command):
subprocess.run(
command + " 2>&1",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
@classmethod
def run(cls, command):
res = ""
result = subprocess.run(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
if result.returncode == 0:
res = result.stdout
return res.strip()
@classmethod
def check(cls, command):
result = subprocess.run(
command + " 2>&1",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
return result.returncode == 0

View File

@ -9,4 +9,21 @@
<key1>value1</key1>
</collection1>
</named_collections>
<remote_servers>
<replicated_nc_nodes_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_with_keeper</host>
<port>9000</port>
</replica>
<replica>
<host>node_with_keeper_2</host>
<port>9000</port>
</replica>
</shard>
<allow_distributed_ddl_queries>true</allow_distributed_ddl_queries>
</replicated_nc_nodes_cluster>
</remote_servers>
</clickhouse>

View File

@ -1,4 +1,9 @@
<clickhouse>
<profiles>
<default>
<ignore_on_cluster_for_replicated_named_collections_queries>0</ignore_on_cluster_for_replicated_named_collections_queries>
</default>
</profiles>
<users>
<default>
<password></password>

View File

@ -3,6 +3,8 @@ import pytest
import os
import time
from helpers.cluster import ClickHouseCluster
from contextlib import nullcontext as does_not_raise
from helpers.client import QueryRuntimeException
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
NAMED_COLLECTIONS_CONFIG = os.path.join(
@ -761,3 +763,32 @@ def test_keeper_storage(cluster):
check_dropped(node1)
check_dropped(node2)
@pytest.mark.parametrize(
"ignore, expected_raise",
[(True, does_not_raise()), (False, pytest.raises(QueryRuntimeException))],
)
def test_keeper_storage_remove_on_cluster(cluster, ignore, expected_raise):
node = cluster.instances["node_with_keeper"]
replace_in_users_config(
node,
"ignore_on_cluster_for_replicated_named_collections_queries>.",
f"ignore_on_cluster_for_replicated_named_collections_queries>{int(ignore)}",
)
node.query("SYSTEM RELOAD CONFIG")
with expected_raise:
node.query(
"DROP NAMED COLLECTION IF EXISTS test_nc ON CLUSTER `replicated_nc_nodes_cluster`"
)
node.query(
f"CREATE NAMED COLLECTION test_nc ON CLUSTER `replicated_nc_nodes_cluster` AS key1=1, key2=2 OVERRIDABLE"
)
node.query(
f"ALTER NAMED COLLECTION test_nc ON CLUSTER `replicated_nc_nodes_cluster` SET key2=3"
)
node.query(
f"DROP NAMED COLLECTION test_nc ON CLUSTER `replicated_nc_nodes_cluster`"
)

View File

@ -596,19 +596,116 @@ def test_partition_columns(started_cluster):
)
assert result == 1
# instance.query(
# f"""
# DROP TABLE IF EXISTS {TABLE_NAME};
# CREATE TABLE {TABLE_NAME} (a Int32, b String, c DateTime)
# ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
# )
# assert (
# int(
# instance.query(
# f"SELECT count() FROM {TABLE_NAME} WHERE c != toDateTime('2000/01/05')"
# )
# )
# == num_rows - 1
# )
# instance.query(f"SELECT a, b, c, FROM {TABLE_NAME}")
# assert False
instance.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (a Nullable(Int32), b Nullable(String), c Nullable(Date32), d Nullable(Int32), e Nullable(Bool))
ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
)
assert (
"""1 test1 2000-01-01 1 false
2 test2 2000-01-02 2 false
3 test3 2000-01-03 3 false
4 test4 2000-01-04 4 false
5 test5 2000-01-05 5 false
6 test6 2000-01-06 6 false
7 test7 2000-01-07 7 false
8 test8 2000-01-08 8 false
9 test9 2000-01-09 9 false"""
== instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY b").strip()
)
assert (
int(
instance.query(
f"SELECT count() FROM {TABLE_NAME} WHERE c == toDateTime('2000/01/05')"
)
)
== 1
)
# Subset of columns should work.
instance.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (b Nullable(String), c Nullable(Date32), d Nullable(Int32))
ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
)
assert (
"""test1 2000-01-01 1
test2 2000-01-02 2
test3 2000-01-03 3
test4 2000-01-04 4
test5 2000-01-05 5
test6 2000-01-06 6
test7 2000-01-07 7
test8 2000-01-08 8
test9 2000-01-09 9"""
== instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY b").strip()
)
for i in range(num_rows + 1, 2 * num_rows + 1):
data = [
(
i,
"test" + str(i),
datetime.strptime(f"2000-01-{i}", "%Y-%m-%d"),
i,
False,
)
]
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.write.mode("append").format("delta").partitionBy(partition_columns).save(
f"/{TABLE_NAME}"
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
ok = False
for file in files:
if file.endswith("last_checkpoint"):
ok = True
assert ok
result = int(
instance.query(
f"""SELECT count()
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')
"""
)
)
assert result == num_rows * 2
assert (
"""1 test1 2000-01-01 1 false
2 test2 2000-01-02 2 false
3 test3 2000-01-03 3 false
4 test4 2000-01-04 4 false
5 test5 2000-01-05 5 false
6 test6 2000-01-06 6 false
7 test7 2000-01-07 7 false
8 test8 2000-01-08 8 false
9 test9 2000-01-09 9 false
10 test10 2000-01-10 10 false
11 test11 2000-01-11 11 false
12 test12 2000-01-12 12 false
13 test13 2000-01-13 13 false
14 test14 2000-01-14 14 false
15 test15 2000-01-15 15 false
16 test16 2000-01-16 16 false
17 test17 2000-01-17 17 false
18 test18 2000-01-18 18 false"""
== instance.query(
f"""
SELECT * FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123') ORDER BY c
"""
).strip()
)
assert (
int(
instance.query(
f"SELECT count() FROM {TABLE_NAME} WHERE c == toDateTime('2000/01/15')"
)
)
== 1
)

View File

@ -78,13 +78,13 @@ def wait_rabbitmq_to_start(rabbitmq_docker_id, cookie, timeout=180):
def kill_rabbitmq(rabbitmq_id):
p = subprocess.Popen(("docker", "stop", rabbitmq_id), stdout=subprocess.PIPE)
p.communicate()
p.wait(timeout=60)
return p.returncode == 0
def revive_rabbitmq(rabbitmq_id, cookie):
p = subprocess.Popen(("docker", "start", rabbitmq_id), stdout=subprocess.PIPE)
p.communicate()
p.wait(timeout=60)
wait_rabbitmq_to_start(rabbitmq_id, cookie)

View File

@ -17,3 +17,27 @@ ENGINE = MergeTree ORDER BY conversation;
INSERT INTO t2(conversation) VALUES (42);
select * from t2;
drop table t1;
INSERT INTO t2(conversation) VALUES (42); -- { serverError UNKNOWN_TABLE }
drop table t2;
CREATE TABLE t2 (
`conversation` UInt64,
CONSTRAINT constraint_conversation CHECK conversation IN (SELECT id FROM t1)
)
ENGINE = MergeTree ORDER BY conversation;
INSERT INTO t2(conversation) VALUES (42); -- { serverError UNKNOWN_TABLE }
CREATE TABLE t1 (
`id` UInt64
)
ENGINE = MergeTree ORDER BY id;
INSERT INTO t1(id) VALUES (42);
INSERT INTO t2(conversation) VALUES (42);
select * from t2;

View File

@ -0,0 +1,6 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS table_with_materialized;
CREATE TABLE table_with_materialized (col String MATERIALIZED 'A') ENGINE = Memory;
SELECT number FROM numbers(1) AS n, table_with_materialized;
DROP TABLE table_with_materialized;