mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Debug commit
This commit is contained in:
parent
f54a4b073a
commit
d2eded16aa
@ -110,6 +110,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage)
|
||||
add_headers_and_sources(dbms Storages/ObjectStorage/Azure)
|
||||
add_headers_and_sources(dbms Storages/ObjectStorage/S3)
|
||||
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
|
||||
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
|
||||
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
|
||||
add_headers_and_sources(dbms Common/NamedCollections)
|
||||
|
||||
|
@ -1,15 +1,17 @@
|
||||
#include <Disks/ObjectStorages/Local/LocalObjectStorage.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <exception>
|
||||
#include <filesystem>
|
||||
#include <stdexcept>
|
||||
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
|
||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||
#include <Disks/IO/createReadBufferFromFileBase.h>
|
||||
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/copyData.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
#include <filesystem>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
|
@ -80,6 +80,7 @@ UUID parseUUID(std::span<const UInt8> src)
|
||||
return uuid;
|
||||
}
|
||||
|
||||
|
||||
void NO_INLINE throwAtAssertionFailed(const char * s, ReadBuffer & buf)
|
||||
{
|
||||
WriteBufferFromOwnString out;
|
||||
@ -88,12 +89,15 @@ void NO_INLINE throwAtAssertionFailed(const char * s, ReadBuffer & buf)
|
||||
if (buf.eof())
|
||||
out << " at end of stream.";
|
||||
else
|
||||
out << " before: " << quote << String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position()));
|
||||
out << " before: " << quote << String(buf.position(), std::min(static_cast<ptrdiff_t>(1000), buf.buffer().end() - buf.position()));
|
||||
|
||||
throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "Cannot parse input: expected {}", out.str());
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED,
|
||||
"Cannot parse input: expected {} {}",
|
||||
out.str(),
|
||||
buf.buffer().end() - buf.position());
|
||||
}
|
||||
|
||||
|
||||
bool checkString(const char * s, ReadBuffer & buf)
|
||||
{
|
||||
for (; *s; ++s)
|
||||
|
@ -42,19 +42,32 @@ const Block & PullingPipelineExecutor::getHeader() const
|
||||
|
||||
bool PullingPipelineExecutor::pull(Chunk & chunk)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Pulling 1"), "Pulling 1");
|
||||
|
||||
if (!executor)
|
||||
{
|
||||
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
|
||||
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
|
||||
}
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Pulling 2"), "Pulling 2");
|
||||
|
||||
if (!executor->checkTimeLimitSoft())
|
||||
return false;
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Pulling 3"), "Pulling 3");
|
||||
|
||||
|
||||
if (!executor->executeStep(&has_data_flag))
|
||||
return false;
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Pulling 4"), "Pulling 4");
|
||||
|
||||
|
||||
chunk = pulling_format->getChunk();
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Pulling 5"), "Pulling 5");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@ void StorageLocalConfiguration::fromNamedCollection(const NamedCollection & coll
|
||||
format = collection.getOrDefault<String>("format", "auto");
|
||||
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
|
||||
structure = collection.getOrDefault<String>("structure", "auto");
|
||||
paths = {path};
|
||||
}
|
||||
|
||||
|
||||
@ -56,6 +57,7 @@ void StorageLocalConfiguration::fromAST(ASTs & args, ContextPtr context, bool wi
|
||||
{
|
||||
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
|
||||
}
|
||||
paths = {path};
|
||||
}
|
||||
|
||||
StorageObjectStorage::QuerySettings StorageLocalConfiguration::getQuerySettings(const ContextPtr & context) const
|
||||
|
@ -2,11 +2,13 @@
|
||||
|
||||
#include <memory>
|
||||
#include "Disks/ObjectStorages/Local/LocalObjectStorage.h"
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3
|
||||
# include <IO/S3Settings.h>
|
||||
# include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -34,12 +36,12 @@ public:
|
||||
String getDataSourceDescription() const override { return ""; }
|
||||
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
|
||||
|
||||
void check(ContextPtr) const override { }
|
||||
void validateNamespace(const String &) const override { }
|
||||
ConfigurationPtr clone() override { return std::make_shared<StorageLocalConfiguration>(*this); }
|
||||
bool isStaticConfiguration() const override { return true; }
|
||||
|
||||
ObjectStoragePtr createObjectStorage(ContextPtr, bool) override { return std::make_shared<LocalObjectStorage>(path); }
|
||||
ObjectStoragePtr createObjectStorage(ContextPtr, bool) override
|
||||
{
|
||||
return std::make_shared<LocalObjectStorage>(fs::path{path}.parent_path());
|
||||
}
|
||||
|
||||
void addStructureAndFormatToArgs(ASTs &, const String &, const String &, ContextPtr) override { }
|
||||
|
||||
@ -51,5 +53,3 @@ private:
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -124,6 +124,12 @@ public:
|
||||
, num_streams(num_streams_)
|
||||
, distributed_processing(distributed_processing_)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Read step created"), "Read step created");
|
||||
LOG_DEBUG(&Poco::Logger::get("Conf"), "Keys size: {}", configuration->getPaths().size());
|
||||
for (auto && key : configuration->getPaths())
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Conf"), "Current key: {}", key);
|
||||
}
|
||||
}
|
||||
|
||||
std::string getName() const override { return name; }
|
||||
@ -422,6 +428,12 @@ SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, c
|
||||
DEFAULT_SCHEMA_CACHE_ELEMENTS));
|
||||
return schema_cache;
|
||||
}
|
||||
else if (storage_type_name == "local")
|
||||
{
|
||||
static SchemaCache schema_cache(
|
||||
context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_local", DEFAULT_SCHEMA_CACHE_ELEMENTS));
|
||||
return schema_cache;
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported storage type: {}", storage_type_name);
|
||||
}
|
||||
|
@ -1,17 +1,18 @@
|
||||
#include "StorageObjectStorageSource.h"
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
||||
#include <Processors/Sources/ConstChunkGenerator.h>
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Processors/Transforms/ExtractColumnsTransform.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/Archives/createArchiveReader.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Formats/ReadSchemaUtils.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||
#include <IO/Archives/createArchiveReader.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Processors/Sources/ConstChunkGenerator.h>
|
||||
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
||||
#include <Processors/Transforms/ExtractColumnsTransform.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Storages/Cache/SchemaCache.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include "Common/logger_useful.h"
|
||||
#include <Common/parseGlobs.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
@ -69,6 +70,7 @@ StorageObjectStorageSource::StorageObjectStorageSource(
|
||||
, schema_cache(StorageObjectStorage::getSchemaCache(context_, configuration->getTypeName()))
|
||||
, create_reader_scheduler(threadPoolCallbackRunnerUnsafe<ReaderHolder>(*create_reader_pool, "Reader"))
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Source created"), "Source created");
|
||||
}
|
||||
|
||||
StorageObjectStorageSource::~StorageObjectStorageSource()
|
||||
@ -131,6 +133,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
||||
{
|
||||
ConfigurationPtr copy_configuration = configuration->clone();
|
||||
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
||||
|
||||
if (filter_dag)
|
||||
{
|
||||
auto keys = configuration->getPaths();
|
||||
@ -142,6 +145,19 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
||||
copy_configuration->setPaths(keys);
|
||||
}
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Conf"), "Keys size: {}", configuration->getPaths().size());
|
||||
for (auto && key : configuration->getPaths())
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Conf"), "Current key: {}", key);
|
||||
}
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Copy Conf"), "Keys size: {}", copy_configuration->getPaths().size());
|
||||
for (auto && key : copy_configuration->getPaths())
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Copy Conf"), "Current key: {}", key);
|
||||
}
|
||||
|
||||
|
||||
iterator = std::make_unique<KeysIterator>(
|
||||
object_storage, copy_configuration, virtual_columns, is_archive ? nullptr : read_keys,
|
||||
settings.ignore_non_existent_file, file_progress_callback);
|
||||
@ -170,8 +186,11 @@ Chunk StorageObjectStorageSource::generate()
|
||||
{
|
||||
lazyInitialize();
|
||||
|
||||
|
||||
while (true)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Generating"), "Generating reader: {}", !(!reader));
|
||||
|
||||
if (isCancelled() || !reader)
|
||||
{
|
||||
if (reader)
|
||||
@ -179,10 +198,15 @@ Chunk StorageObjectStorageSource::generate()
|
||||
break;
|
||||
}
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Generating 2"), "Generating 2");
|
||||
|
||||
Chunk chunk;
|
||||
if (reader->pull(chunk))
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Generating 3"), "Generating 3");
|
||||
|
||||
UInt64 num_rows = chunk.getNumRows();
|
||||
LOG_DEBUG(&Poco::Logger::get("Creating_chunk"), "Chunk size: {}", num_rows);
|
||||
total_rows_in_file += num_rows;
|
||||
|
||||
size_t chunk_size = 0;
|
||||
@ -230,6 +254,9 @@ Chunk StorageObjectStorageSource::generate()
|
||||
return chunk;
|
||||
}
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Generating 4"), "Generating 4");
|
||||
|
||||
|
||||
if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files)
|
||||
addNumRowsToCache(*reader.getObjectInfo(), total_rows_in_file);
|
||||
|
||||
@ -300,6 +327,8 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
|
||||
}
|
||||
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Unreached point 1"), "");
|
||||
|
||||
QueryPipelineBuilder builder;
|
||||
std::shared_ptr<ISource> source;
|
||||
std::unique_ptr<ReadBuffer> read_buf;
|
||||
@ -324,11 +353,17 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
|
||||
return schema_cache->tryGetNumRows(cache_key, get_last_mod_time);
|
||||
};
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Unreached point 2"), "");
|
||||
|
||||
|
||||
std::optional<size_t> num_rows_from_cache = need_only_count
|
||||
&& context_->getSettingsRef().use_cache_for_count_from_files
|
||||
? try_get_num_rows_from_cache()
|
||||
: std::nullopt;
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("Unreached point 3"), "");
|
||||
|
||||
|
||||
if (num_rows_from_cache)
|
||||
{
|
||||
/// We should not return single chunk with all number of rows,
|
||||
@ -341,6 +376,8 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Unreached point 4"), "");
|
||||
|
||||
CompressionMethod compression_method;
|
||||
if (const auto * object_info_in_archive = dynamic_cast<const ArchiveIterator::ObjectInfoInArchive *>(object_info.get()))
|
||||
{
|
||||
@ -625,11 +662,18 @@ StorageObjectStorageSource::KeysIterator::KeysIterator(
|
||||
, keys(configuration->getPaths())
|
||||
, ignore_non_existent_files(ignore_non_existent_files_)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Keys size"), "Keys size: {}", keys.size());
|
||||
for (auto && key : keys)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Current Key"), "Current key: {}", key);
|
||||
}
|
||||
if (read_keys_)
|
||||
{
|
||||
/// TODO: should we add metadata if we anyway fetch it if file_progress_callback is passed?
|
||||
|
||||
for (auto && key : keys)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("Current Key"), "Current key: {}", key);
|
||||
auto object_info = std::make_shared<ObjectInfo>(key);
|
||||
read_keys_->emplace_back(object_info);
|
||||
}
|
||||
|
@ -12,12 +12,13 @@
|
||||
|
||||
#include <Interpreters/parseColumnsListForTableFunction.h>
|
||||
|
||||
#include <Storages/ObjectStorage/Utils.h>
|
||||
#include <Storages/NamedCollectionsHelpers.h>
|
||||
#include <Storages/ObjectStorage/S3/Configuration.h>
|
||||
#include <Storages/ObjectStorage/HDFS/Configuration.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||
#include <Storages/ObjectStorage/Azure/Configuration.h>
|
||||
#include <Storages/ObjectStorage/HDFS/Configuration.h>
|
||||
#include <Storages/ObjectStorage/Local/Configuration.h>
|
||||
#include <Storages/ObjectStorage/S3/Configuration.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||
#include <Storages/ObjectStorage/Utils.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -203,6 +204,11 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
|
||||
.allow_readonly = false
|
||||
});
|
||||
#endif
|
||||
factory.registerFunction<TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the data stored locally.)",
|
||||
.examples{{"local", "SELECT * FROM local(file_path, [format, compression, structure])", ""}}},
|
||||
.allow_readonly = false});
|
||||
}
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
@ -223,4 +229,5 @@ template class TableFunctionObjectStorage<HDFSDefinition, StorageHDFSConfigurati
|
||||
template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConfiguration>;
|
||||
#endif
|
||||
|
||||
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user