mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Fix some bugs
This commit is contained in:
parent
07508cb381
commit
af7aa7de56
@ -1,6 +1,9 @@
|
|||||||
#include "Common.h"
|
#include "Common.h"
|
||||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||||
|
#include <Poco/DateTimeFormat.h>
|
||||||
|
#include <Poco/DateTimeFormatter.h>
|
||||||
|
#include <Poco/Logger.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -13,6 +16,10 @@ std::vector<String> listFiles(
|
|||||||
{
|
{
|
||||||
auto key = std::filesystem::path(configuration.getPath()) / prefix;
|
auto key = std::filesystem::path(configuration.getPath()) / prefix;
|
||||||
RelativePathsWithMetadata files_with_metadata;
|
RelativePathsWithMetadata files_with_metadata;
|
||||||
|
// time_t now = time(nullptr);
|
||||||
|
Poco::DateTime now;
|
||||||
|
std::string formatted = Poco::DateTimeFormatter::format(now, Poco::DateTimeFormat::ISO8601_FORMAT);
|
||||||
|
LOG_ERROR(&Poco::Logger::get("Inside listFiles"), "Time of files listing: {}", formatted);
|
||||||
object_storage.listObjects(key, files_with_metadata, 0);
|
object_storage.listObjects(key, files_with_metadata, 0);
|
||||||
Strings res;
|
Strings res;
|
||||||
for (const auto & file_with_metadata : files_with_metadata)
|
for (const auto & file_with_metadata : files_with_metadata)
|
||||||
|
@ -36,7 +36,7 @@ public:
|
|||||||
|
|
||||||
void update(ObjectStoragePtr object_storage, ContextPtr local_context) override
|
void update(ObjectStoragePtr object_storage, ContextPtr local_context) override
|
||||||
{
|
{
|
||||||
BaseStorageConfiguration::update(object_storage, local_context);
|
// BaseStorageConfiguration::update(object_storage, local_context);
|
||||||
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
|
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
|
||||||
if (current_metadata && *current_metadata == *new_metadata)
|
if (current_metadata && *current_metadata == *new_metadata)
|
||||||
return;
|
return;
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
#include <Storages/ObjectStorage/Utils.h>
|
#include <Storages/ObjectStorage/Utils.h>
|
||||||
#include <Storages/StorageFactory.h>
|
#include <Storages/StorageFactory.h>
|
||||||
#include <Storages/VirtualColumnUtils.h>
|
#include <Storages/VirtualColumnUtils.h>
|
||||||
|
#include "Databases/LoadingStrictnessLevel.h"
|
||||||
#include "Storages/ColumnsDescription.h"
|
#include "Storages/ColumnsDescription.h"
|
||||||
|
|
||||||
|
|
||||||
@ -68,6 +69,27 @@ String StorageObjectStorage::getPathSample(StorageInMemoryMetadata metadata, Con
|
|||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void printConfiguration(const Poco::Util::AbstractConfiguration & config, std::string log_name, const std::string & prefix = "")
|
||||||
|
{
|
||||||
|
Poco::Util::AbstractConfiguration::Keys keys;
|
||||||
|
config.keys(prefix, keys);
|
||||||
|
|
||||||
|
for (const auto & key : keys)
|
||||||
|
{
|
||||||
|
std::string fullKey = prefix.empty() ? key : (prefix + "." + key);
|
||||||
|
|
||||||
|
if (config.hasProperty(fullKey))
|
||||||
|
{
|
||||||
|
std::string value = config.getString(fullKey);
|
||||||
|
LOG_DEBUG(&Poco::Logger::get(log_name), "{} = {}", fullKey, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recursively print sub-configurations
|
||||||
|
printConfiguration(config, fullKey, log_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
StorageObjectStorage::StorageObjectStorage(
|
StorageObjectStorage::StorageObjectStorage(
|
||||||
ConfigurationPtr configuration_,
|
ConfigurationPtr configuration_,
|
||||||
ObjectStoragePtr object_storage_,
|
ObjectStoragePtr object_storage_,
|
||||||
@ -77,6 +99,7 @@ StorageObjectStorage::StorageObjectStorage(
|
|||||||
const ConstraintsDescription & constraints_,
|
const ConstraintsDescription & constraints_,
|
||||||
const String & comment,
|
const String & comment,
|
||||||
std::optional<FormatSettings> format_settings_,
|
std::optional<FormatSettings> format_settings_,
|
||||||
|
LoadingStrictnessLevel mode,
|
||||||
bool distributed_processing_,
|
bool distributed_processing_,
|
||||||
ASTPtr partition_by_)
|
ASTPtr partition_by_)
|
||||||
: IStorage(table_id_)
|
: IStorage(table_id_)
|
||||||
@ -87,11 +110,27 @@ StorageObjectStorage::StorageObjectStorage(
|
|||||||
, distributed_processing(distributed_processing_)
|
, distributed_processing(distributed_processing_)
|
||||||
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
|
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
|
||||||
{
|
{
|
||||||
ColumnsDescription columns{columns_};
|
// LOG_DEBUG(&Poco::Logger::get("StorageObjectStorage Creation"), "Columns size {}", columns.size());
|
||||||
LOG_DEBUG(&Poco::Logger::get("StorageObjectStorage Creation"), "Columns size {}", columns.size());
|
printConfiguration(context->getConfigRef(), "Storage create");
|
||||||
configuration->update(object_storage, context);
|
try
|
||||||
|
{
|
||||||
|
// configuration->update(object_storage, context);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
if (mode <= LoadingStrictnessLevel::CREATE)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::string sample_path;
|
std::string sample_path;
|
||||||
|
ColumnsDescription columns{columns_};
|
||||||
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context);
|
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context);
|
||||||
configuration->check(context);
|
configuration->check(context);
|
||||||
|
|
||||||
@ -271,6 +310,7 @@ void StorageObjectStorage::read(
|
|||||||
size_t num_streams)
|
size_t num_streams)
|
||||||
{
|
{
|
||||||
configuration->update(object_storage, local_context);
|
configuration->update(object_storage, local_context);
|
||||||
|
printConfiguration(local_context->getConfigRef(), "Select query");
|
||||||
if (partition_by && configuration->withPartitionWildcard())
|
if (partition_by && configuration->withPartitionWildcard())
|
||||||
{
|
{
|
||||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
|
||||||
|
@ -57,6 +57,7 @@ public:
|
|||||||
const ConstraintsDescription & constraints_,
|
const ConstraintsDescription & constraints_,
|
||||||
const String & comment,
|
const String & comment,
|
||||||
std::optional<FormatSettings> format_settings_,
|
std::optional<FormatSettings> format_settings_,
|
||||||
|
LoadingStrictnessLevel mode,
|
||||||
bool distributed_processing_ = false,
|
bool distributed_processing_ = false,
|
||||||
ASTPtr partition_by_ = nullptr);
|
ASTPtr partition_by_ = nullptr);
|
||||||
|
|
||||||
@ -217,6 +218,7 @@ public:
|
|||||||
|
|
||||||
virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
|
virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
|
||||||
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;
|
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;
|
||||||
virtual void fromAST(ASTs & args, ContextPtr context, bool with_structure) = 0;
|
virtual void fromAST(ASTs & args, ContextPtr context, bool with_structure) = 0;
|
||||||
|
@ -58,6 +58,7 @@ static std::shared_ptr<StorageObjectStorage> createStorageObjectStorage(
|
|||||||
args.constraints,
|
args.constraints,
|
||||||
args.comment,
|
args.comment,
|
||||||
format_settings,
|
format_settings,
|
||||||
|
args.mode,
|
||||||
/* distributed_processing */ false,
|
/* distributed_processing */ false,
|
||||||
partition_by);
|
partition_by);
|
||||||
}
|
}
|
||||||
|
@ -118,6 +118,7 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration>::executeImpl(
|
|||||||
ConstraintsDescription{},
|
ConstraintsDescription{},
|
||||||
String{},
|
String{},
|
||||||
/* format_settings */ std::nullopt,
|
/* format_settings */ std::nullopt,
|
||||||
|
/* mode */ LoadingStrictnessLevel::CREATE,
|
||||||
/* distributed_processing */ false,
|
/* distributed_processing */ false,
|
||||||
nullptr);
|
nullptr);
|
||||||
|
|
||||||
|
@ -43,6 +43,7 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, Configuration>::execute
|
|||||||
ConstraintsDescription{},
|
ConstraintsDescription{},
|
||||||
/* comment */ String{},
|
/* comment */ String{},
|
||||||
/* format_settings */ std::nullopt, /// No format_settings
|
/* format_settings */ std::nullopt, /// No format_settings
|
||||||
|
/* mode */ LoadingStrictnessLevel::CREATE,
|
||||||
/* distributed_processing */ true,
|
/* distributed_processing */ true,
|
||||||
/*partition_by_=*/nullptr);
|
/*partition_by_=*/nullptr);
|
||||||
}
|
}
|
||||||
|
@ -5,4 +5,5 @@
|
|||||||
<path>cache1</path>
|
<path>cache1</path>
|
||||||
</cache1>
|
</cache1>
|
||||||
</filesystem_caches>
|
</filesystem_caches>
|
||||||
|
<!-- <async_load_databases>false</async_load_databases> -->
|
||||||
</clickhouse>
|
</clickhouse>
|
||||||
|
@ -6,6 +6,8 @@ import time
|
|||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
from logging import log
|
||||||
|
|
||||||
import pyspark
|
import pyspark
|
||||||
import pytest
|
import pytest
|
||||||
from azure.storage.blob import BlobServiceClient
|
from azure.storage.blob import BlobServiceClient
|
||||||
@ -856,14 +858,20 @@ def test_restart_broken_s3(started_cluster):
|
|||||||
)
|
)
|
||||||
minio_client.remove_bucket(bucket)
|
minio_client.remove_bucket(bucket)
|
||||||
|
|
||||||
|
print("Before restart: ", datetime.now())
|
||||||
|
|
||||||
instance.restart_clickhouse()
|
instance.restart_clickhouse()
|
||||||
|
|
||||||
assert "NoSuchBucket" in instance.query_and_get_error(
|
# assert "NoSuchBucket" in instance.query_and_get_error(
|
||||||
f"SELECT count() FROM {TABLE_NAME}"
|
# f"SELECT count() FROM {TABLE_NAME}"
|
||||||
)
|
# )
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
minio_client.make_bucket(bucket)
|
minio_client.make_bucket(bucket)
|
||||||
|
|
||||||
|
print("Before successful select: ", datetime.now())
|
||||||
|
|
||||||
files = default_upload_directory(
|
files = default_upload_directory(
|
||||||
started_cluster,
|
started_cluster,
|
||||||
"s3",
|
"s3",
|
||||||
|
Loading…
Reference in New Issue
Block a user