Merge pull request #36314 from CurtizJ/print-bad-filenames

Show names of erroneous files in case of parsing errors while executing table functions
This commit is contained in:
Kseniia Sumarokova 2022-04-22 13:24:55 +02:00 committed by GitHub
commit 33bb48106f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 251 additions and 85 deletions

View File

@ -1480,7 +1480,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end,
String & query_to_execute, ASTPtr & parsed_query, const String & all_queries_text,
std::optional<Exception> & current_exception)
std::unique_ptr<Exception> & current_exception)
{
if (!is_interactive && cancelled)
return MultiQueryProcessingStage::QUERIES_END;
@ -1518,7 +1518,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
}
catch (Exception & e)
{
current_exception.emplace(e);
current_exception.reset(e.clone());
return MultiQueryProcessingStage::PARSING_EXCEPTION;
}
@ -1599,7 +1599,7 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
String full_query; // full_query is the query + inline INSERT data + trailing comments (the latter is our best guess for now).
String query_to_execute;
ASTPtr parsed_query;
std::optional<Exception> current_exception;
std::unique_ptr<Exception> current_exception;
while (true)
{
@ -1943,7 +1943,7 @@ void ClientBase::runInteractive()
{
/// We don't need to handle the test hints in the interactive mode.
std::cerr << "Exception on client:" << std::endl << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl;
client_exception = std::make_unique<Exception>(e);
client_exception.reset(e.clone());
}
if (client_exception)

View File

@ -80,7 +80,7 @@ protected:
MultiQueryProcessingStage analyzeMultiQueryText(
const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end,
String & query_to_execute, ASTPtr & parsed_query, const String & all_queries_text,
std::optional<Exception> & current_exception);
std::unique_ptr<Exception> & current_exception);
static void clearTerminal();
void showClientVersion();

View File

@ -154,17 +154,17 @@ void LocalConnection::sendQuery(
catch (const Exception & e)
{
state->io.onException();
state->exception.emplace(e);
state->exception.reset(e.clone());
}
catch (const std::exception & e)
{
state->io.onException();
state->exception.emplace(Exception::CreateFromSTDTag{}, e);
state->exception = std::make_unique<Exception>(Exception::CreateFromSTDTag{}, e);
}
catch (...)
{
state->io.onException();
state->exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
state->exception = std::make_unique<Exception>("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
}
@ -260,17 +260,17 @@ bool LocalConnection::poll(size_t)
catch (const Exception & e)
{
state->io.onException();
state->exception.emplace(e);
state->exception.reset(e.clone());
}
catch (const std::exception & e)
{
state->io.onException();
state->exception.emplace(Exception::CreateFromSTDTag{}, e);
state->exception = std::make_unique<Exception>(Exception::CreateFromSTDTag{}, e);
}
catch (...)
{
state->io.onException();
state->exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
state->exception = std::make_unique<Exception>("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
}
@ -434,7 +434,7 @@ Packet LocalConnection::receivePacket()
}
case Protocol::Server::Exception:
{
packet.exception = std::make_unique<Exception>(*state->exception);
packet.exception.reset(state->exception->clone());
next_packet_type.reset();
break;
}

View File

@ -32,7 +32,7 @@ struct LocalQueryState
std::unique_ptr<PushingAsyncPipelineExecutor> pushing_async_executor;
InternalProfileEventsQueuePtr profile_queue;
std::optional<Exception> exception;
std::unique_ptr<Exception> exception;
/// Current block to be sent next.
std::optional<Block> block;

View File

@ -555,13 +555,24 @@ std::string ParsingException::displayText() const
{
try
{
if (line_number == -1)
formatted_message = message();
else
formatted_message = message() + fmt::format(": (at row {})\n", line_number);
formatted_message = message();
bool need_newline = false;
if (!file_name.empty())
{
formatted_message += fmt::format(": (in file/uri {})", file_name);
need_newline = true;
}
if (line_number != -1)
{
formatted_message += fmt::format(": (at row {})", line_number);
need_newline = true;
}
if (need_newline)
formatted_message += "\n";
}
catch (...)
{}
catch (...) {}
if (!formatted_message.empty())
{

View File

@ -132,8 +132,15 @@ public:
int getLineNumber() const { return line_number; }
void setLineNumber(int line_number_) { line_number = line_number_;}
const String getFileName() const { return file_name; }
void setFileName(const String & file_name_) { file_name = file_name_; }
Exception * clone() const override { return new ParsingException(*this); }
void rethrow() const override { throw *this; }
private:
ssize_t line_number{-1};
String file_name;
mutable std::string formatted_message;
const char * name() const throw() override { return "DB::ParsingException"; }

View File

@ -32,14 +32,13 @@ public:
};
BrotliReadBuffer::BrotliReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char *existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(std::move(in_))
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
, eof_flag(false)
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
, eof_flag(false)
{
}

View File

@ -1,13 +1,13 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressedReadBufferWrapper.h>
namespace DB
{
class BrotliReadBuffer : public BufferWithOwnMemory<ReadBuffer>
class BrotliReadBuffer : public CompressedReadBufferWrapper
{
public:
explicit BrotliReadBuffer(
@ -21,8 +21,6 @@ public:
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
class BrotliStateWrapper;
std::unique_ptr<BrotliStateWrapper> brotli;

View File

@ -39,10 +39,9 @@ public:
};
Bzip2ReadBuffer::Bzip2ReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char *existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(std::move(in_))
, bz(std::make_unique<Bzip2StateWrapper>())
, eof_flag(false)
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
, bz(std::make_unique<Bzip2StateWrapper>())
, eof_flag(false)
{
}

View File

@ -1,13 +1,13 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressedReadBufferWrapper.h>
namespace DB
{
class Bzip2ReadBuffer : public BufferWithOwnMemory<ReadBuffer>
class Bzip2ReadBuffer : public CompressedReadBufferWrapper
{
public:
explicit Bzip2ReadBuffer(
@ -21,8 +21,6 @@ public:
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
class Bzip2StateWrapper;
std::unique_ptr<Bzip2StateWrapper> bz;

View File

@ -0,0 +1,25 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
namespace DB
{
class CompressedReadBufferWrapper : public BufferWithOwnMemory<ReadBuffer>
{
public:
CompressedReadBufferWrapper(
std::unique_ptr<ReadBuffer> in_,
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(std::move(in_)) {}
const ReadBuffer & getWrappedReadBuffer() const { return *in; }
protected:
std::unique_ptr<ReadBuffer> in;
};
}

View File

@ -98,8 +98,7 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment)
{
if (method == CompressionMethod::Gzip || method == CompressionMethod::Zlib)
@ -123,10 +122,15 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
return std::make_unique<HadoopSnappyReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
#endif
throw Exception("Unsupported compression method", ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment)
{
if (method == CompressionMethod::None)
return nested;
throw Exception("Unsupported compression method", ErrorCodes::NOT_IMPLEMENTED);
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment);
}

View File

@ -166,8 +166,7 @@ HadoopSnappyDecoder::Status HadoopSnappyDecoder::readBlock(size_t * avail_in, co
}
HadoopSnappyReadBuffer::HadoopSnappyReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(std::move(in_))
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
, decoder(std::make_unique<HadoopSnappyDecoder>())
, in_available(0)
, in_data(nullptr)

View File

@ -6,7 +6,7 @@
#include <memory>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressedReadBufferWrapper.h>
namespace DB
{
@ -67,7 +67,7 @@ private:
};
/// HadoopSnappyReadBuffer implements read buffer for data compressed with hadoop-snappy format.
class HadoopSnappyReadBuffer : public BufferWithOwnMemory<ReadBuffer>
class HadoopSnappyReadBuffer : public CompressedReadBufferWrapper
{
public:
using Status = HadoopSnappyDecoder::Status;
@ -99,7 +99,6 @@ public:
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
std::unique_ptr<HadoopSnappyDecoder> decoder;
size_t in_available;

View File

@ -8,7 +8,7 @@ namespace ErrorCodes
}
LZMAInflatingReadBuffer::LZMAInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment), in(std::move(in_)), eof_flag(false)
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment), eof_flag(false)
{
lstr = LZMA_STREAM_INIT;
lstr.allocator = nullptr;

View File

@ -1,6 +1,6 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressedReadBufferWrapper.h>
#include <IO/ReadBuffer.h>
#include <lzma.h>
@ -8,7 +8,7 @@
namespace DB
{
class LZMAInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
class LZMAInflatingReadBuffer : public CompressedReadBufferWrapper
{
public:
explicit LZMAInflatingReadBuffer(
@ -22,9 +22,7 @@ public:
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
lzma_stream lstr;
bool eof_flag;
};

View File

@ -8,8 +8,7 @@ namespace ErrorCodes
}
Lz4InflatingReadBuffer::Lz4InflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(std::move(in_))
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
, in_data(nullptr)
, out_data(nullptr)
, in_available(0)

View File

@ -1,6 +1,6 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressedReadBufferWrapper.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
@ -11,7 +11,7 @@
namespace DB
{
class Lz4InflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
class Lz4InflatingReadBuffer : public CompressedReadBufferWrapper
{
public:
explicit Lz4InflatingReadBuffer(
@ -25,8 +25,6 @@ public:
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
LZ4F_dctx* dctx;
void * in_data;

View File

@ -85,6 +85,8 @@ public:
std::optional<size_t> getTotalSize() override;
off_t getPosition() override;
const ReadBufferFactory & getReadBufferFactory() const { return *reader_factory; }
private:
/// Reader in progress with a list of read segments
struct ReadWorker

View File

@ -7,12 +7,13 @@
#include <IO/HTTPCommon.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/ReadSettings.h>
#include <IO/WithFileName.h>
#include <azure/storage/blobs.hpp>
namespace DB
{
class ReadBufferFromAzureBlobStorage : public SeekableReadBuffer
class ReadBufferFromAzureBlobStorage : public SeekableReadBuffer, public WithFileName
{
public:
@ -33,6 +34,8 @@ public:
size_t getFileOffsetOfBufferEnd() const override { return offset; }
String getFileName() const override { return path; }
private:
void initialize();

View File

@ -2,6 +2,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WithFileName.h>
#include <base/time.h>
#include <functional>
@ -19,7 +20,7 @@
namespace DB
{
class ReadBufferFromFileBase : public BufferWithOwnMemory<SeekableReadBuffer>
class ReadBufferFromFileBase : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileName
{
public:
ReadBufferFromFileBase();
@ -29,7 +30,6 @@ public:
size_t alignment,
std::optional<size_t> file_size_ = std::nullopt);
~ReadBufferFromFileBase() override;
virtual std::string getFileName() const = 0;
/// It is possible to get information about the time of each reading.
struct ProfileInfo

View File

@ -12,6 +12,7 @@
#include <IO/ReadBuffer.h>
#include <IO/ReadSettings.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WithFileName.h>
#include <aws/s3/model/GetObjectResult.h>
@ -25,7 +26,7 @@ namespace DB
/**
* Perform S3 HTTP GET request and provide response to read.
*/
class ReadBufferFromS3 : public SeekableReadBufferWithSize
class ReadBufferFromS3 : public SeekableReadBufferWithSize, public WithFileName
{
private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
@ -70,6 +71,8 @@ public:
size_t getFileOffsetOfBufferEnd() const override { return offset; }
String getFileName() const override { return bucket + "/" + key; }
private:
std::unique_ptr<ReadBuffer> initialize();
@ -83,7 +86,7 @@ private:
};
/// Creates separate ReadBufferFromS3 for sequence of ranges of particular object
class ReadBufferS3Factory : public ParallelReadBuffer::ReadBufferFactory
class ReadBufferS3Factory : public ParallelReadBuffer::ReadBufferFactory, public WithFileName
{
public:
explicit ReadBufferS3Factory(
@ -113,6 +116,8 @@ public:
std::optional<size_t> getTotalSize() override;
String getFileName() const override { return bucket + "/" + key; }
private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
const String bucket;

View File

@ -9,6 +9,7 @@
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <IO/WithFileName.h>
#include <base/logger_useful.h>
#include <base/sleep.h>
#include <base/types.h>
@ -85,7 +86,7 @@ public:
namespace detail
{
template <typename UpdatableSessionPtr>
class ReadWriteBufferFromHTTPBase : public SeekableReadBufferWithSize
class ReadWriteBufferFromHTTPBase : public SeekableReadBufferWithSize, public WithFileName
{
public:
using HTTPHeaderEntry = std::tuple<std::string, std::string>;
@ -223,6 +224,8 @@ namespace detail
return read_range.end;
}
String getFileName() const override { return uri.toString(); }
enum class InitializeError
{
/// If error is not retriable, `exception` variable must be set.
@ -674,7 +677,7 @@ public:
}
};
class RangedReadWriteBufferFromHTTPFactory : public ParallelReadBuffer::ReadBufferFactory
class RangedReadWriteBufferFromHTTPFactory : public ParallelReadBuffer::ReadBufferFactory, public WithFileName
{
using OutStreamCallback = ReadWriteBufferFromHTTP::OutStreamCallback;
@ -748,6 +751,8 @@ public:
std::optional<size_t> getTotalSize() override { return total_object_size; }
String getFileName() const override { return uri.toString(); }
private:
RangeGenerator range_generator;
size_t total_object_size;

26
src/IO/WithFileName.cpp Normal file
View File

@ -0,0 +1,26 @@
#include <IO/WithFileName.h>
#include <IO/CompressedReadBufferWrapper.h>
#include <IO/ParallelReadBuffer.h>
namespace DB
{
template <typename T>
static String getFileName(const T & entry)
{
if (const auto * with_file_name = dynamic_cast<const WithFileName *>(&entry))
return with_file_name->getFileName();
return "";
}
String getFileNameFromReadBuffer(const ReadBuffer & in)
{
if (const auto * compressed = dynamic_cast<const CompressedReadBufferWrapper *>(&in))
return getFileName(compressed->getWrappedReadBuffer());
else if (const auto * parallel = dynamic_cast<const ParallelReadBuffer *>(&in))
return getFileName(parallel->getReadBufferFactory());
else
return getFileName(in);
}
}

18
src/IO/WithFileName.h Normal file
View File

@ -0,0 +1,18 @@
#pragma once
#include <base/types.h>
namespace DB
{
class ReadBuffer;
class WithFileName
{
public:
virtual String getFileName() const = 0;
virtual ~WithFileName() = default;
};
String getFileNameFromReadBuffer(const ReadBuffer & in);
}

View File

@ -14,8 +14,7 @@ ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(std::move(in_))
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
, eof_flag(false)
{
zstr.zalloc = nullptr;

View File

@ -1,7 +1,7 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressedReadBufferWrapper.h>
#include <IO/CompressionMethod.h>
#include <zlib.h>
@ -16,7 +16,7 @@ namespace ErrorCodes
/// Reads compressed data from ReadBuffer in_ and performs decompression using zlib library.
/// This buffer is able to seamlessly decompress multiple concatenated zlib streams.
class ZlibInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
class ZlibInflatingReadBuffer : public CompressedReadBufferWrapper
{
public:
ZlibInflatingReadBuffer(
@ -31,7 +31,6 @@ public:
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
z_stream zstr;
bool eof_flag;
};

View File

@ -9,7 +9,7 @@ namespace ErrorCodes
}
ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment), in(std::move(in_))
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
{
dctx = ZSTD_createDCtx();
input = {nullptr, 0, 0};

View File

@ -1,6 +1,6 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressedReadBufferWrapper.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
@ -13,7 +13,7 @@ namespace ErrorCodes
{
}
class ZstdInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
class ZstdInflatingReadBuffer : public CompressedReadBufferWrapper
{
public:
explicit ZstdInflatingReadBuffer(
@ -27,7 +27,6 @@ public:
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
ZSTD_DCtx * dctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;

View File

@ -51,6 +51,7 @@ public:
virtual void resetParser();
virtual void setReadBuffer(ReadBuffer & in_);
const ReadBuffer & getReadBuffer() const { return *in; }
virtual const BlockMissingValues & getMissingValues() const
{

View File

@ -1,6 +1,7 @@
#include <Processors/Formats/IRowInputFormat.h>
#include <DataTypes/ObjectUtils.h>
#include <IO/WriteHelpers.h> // toString
#include <IO/WithFileName.h>
#include <base/logger_useful.h>
@ -165,6 +166,7 @@ Chunk IRowInputFormat::generate()
/// Error while trying to obtain verbose diagnostic. Ok to ignore.
}
e.setFileName(getFileNameFromReadBuffer(getReadBuffer()));
e.setLineNumber(total_rows);
e.addMessage(verbose_diagnostic);
throw;
@ -188,7 +190,12 @@ Chunk IRowInputFormat::generate()
/// Error while trying to obtain verbose diagnostic. Ok to ignore.
}
e.addMessage("(at row " + toString(total_rows) + ")\n" + verbose_diagnostic);
auto file_name = getFileNameFromReadBuffer(getReadBuffer());
if (!file_name.empty())
e.addMessage(fmt::format("(in file/uri {})", file_name));
e.addMessage(fmt::format("(at row {})\n", total_rows));
e.addMessage(verbose_diagnostic);
throw;
}

View File

@ -1,5 +1,6 @@
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h>
#include <IO/ReadHelpers.h>
#include <IO/WithFileName.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
@ -125,11 +126,19 @@ void ParallelParsingInputFormat::onBackgroundException(size_t offset)
{
background_exception = std::current_exception();
if (ParsingException * e = exception_cast<ParsingException *>(background_exception))
{
if (e->getLineNumber() != -1)
e->setLineNumber(e->getLineNumber() + offset);
auto file_name = getFileNameFromReadBuffer(getReadBuffer());
if (!file_name.empty())
e->setFileName(file_name);
}
}
if (is_server)
tryLogCurrentException(__PRETTY_FUNCTION__);
parsing_finished = true;
first_parser_finished.set();
reader_condvar.notify_all();

View File

@ -202,7 +202,7 @@ void TCPHandler::runImpl()
/** An exception during the execution of request (it must be sent over the network to the client).
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
*/
std::optional<DB::Exception> exception;
std::unique_ptr<DB::Exception> exception;
bool network_error = false;
try
@ -396,7 +396,7 @@ void TCPHandler::runImpl()
catch (const Exception & e)
{
state.io.onException();
exception.emplace(e);
exception.reset(e.clone());
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw;
@ -420,12 +420,12 @@ void TCPHandler::runImpl()
* We will try to send exception to the client in any case - see below.
*/
state.io.onException();
exception.emplace(Exception::CreateFromPocoTag{}, e);
exception = std::make_unique<DB::Exception>(Exception::CreateFromPocoTag{}, e);
}
catch (const Poco::Exception & e)
{
state.io.onException();
exception.emplace(Exception::CreateFromPocoTag{}, e);
exception = std::make_unique<DB::Exception>(Exception::CreateFromPocoTag{}, e);
}
// Server should die on std logic errors in debug, like with assert()
// or ErrorCodes::LOGICAL_ERROR. This helps catch these errors in
@ -434,7 +434,7 @@ void TCPHandler::runImpl()
catch (const std::logic_error & e)
{
state.io.onException();
exception.emplace(Exception::CreateFromSTDTag{}, e);
exception = std::make_unique<DB::Exception>(Exception::CreateFromSTDTag{}, e);
sendException(*exception, send_exception_with_stack_trace);
std::abort();
}
@ -442,12 +442,12 @@ void TCPHandler::runImpl()
catch (const std::exception & e)
{
state.io.onException();
exception.emplace(Exception::CreateFromSTDTag{}, e);
exception = std::make_unique<DB::Exception>(Exception::CreateFromSTDTag{}, e);
}
catch (...)
{
state.io.onException();
exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
exception = std::make_unique<DB::Exception>("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
try

View File

@ -182,6 +182,11 @@ size_t ReadBufferFromHDFS::getFileOffsetOfBufferEnd() const
return impl->getPosition();
}
String ReadBufferFromHDFS::getFileName() const
{
return impl->hdfs_file_path;
}
}
#endif

View File

@ -11,6 +11,7 @@
#include <base/types.h>
#include <Interpreters/Context.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WithFileName.h>
namespace DB
@ -19,7 +20,7 @@ namespace DB
/** Accepts HDFS path to file and opens it.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromHDFS : public SeekableReadBufferWithSize
class ReadBufferFromHDFS : public SeekableReadBufferWithSize, public WithFileName
{
struct ReadBufferFromHDFSImpl;
@ -41,6 +42,8 @@ public:
size_t getFileOffsetOfBufferEnd() const override;
String getFileName() const override;
private:
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
};

View File

@ -544,7 +544,6 @@ public:
Chunk chunk;
if (reader->pull(chunk))
{
//Columns columns = res.getColumns();
UInt64 num_rows = chunk.getNumRows();
/// Enrich with virtual columns.

View File

@ -0,0 +1,6 @@
test_02270_2.csv
test_02270_2.csv
test_02270_2.csv
test_02270_2.csv
test_02270_2.csv.gz
test_02270_2.csv.gz

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
[ -e "${CLICKHOUSE_TMP}"/test_02270_1.csv ] && rm "${CLICKHOUSE_TMP}"/test_02270_1.csv
[ -e "${CLICKHOUSE_TMP}"/test_02270_2.csv ] && rm "${CLICKHOUSE_TMP}"/test_02270_2.csv
echo "Hello,World" > "${CLICKHOUSE_TMP}"/test_02270_1.csv
echo "Error" > "${CLICKHOUSE_TMP}"/test_02270_2.csv
${CLICKHOUSE_LOCAL} --query "SELECT * FROM file('${CLICKHOUSE_TMP}/test_02270*.csv', CSV, 'a String, b String')" 2>&1 | grep -o "test_02270_2.csv"
${CLICKHOUSE_LOCAL} --query "SELECT * FROM file('${CLICKHOUSE_TMP}/test_02270*.csv', CSV, 'a String, b String')" --input_format_parallel_parsing 0 2>&1 | grep -o "test_02270_2.csv"
user_files_path=$($CLICKHOUSE_CLIENT --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep -E '^Code: 107.*FILE_DOESNT_EXIST' | head -1 | awk '{gsub("/nonexist.txt","",$9); print $9}')
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE FUNCTION file('test_02270_1.csv') SELECT 'Hello', 'World'"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE FUNCTION file('test_02270_2.csv') SELECT 'Error'"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM file('test_02270*.csv', 'CSV', 'a String, b String')" 2>&1 | grep -o -m1 "test_02270_2.csv"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM file('test_02270*.csv', 'CSV', 'a String, b String')" --input_format_parallel_parsing 0 2>&1 | grep -o -m1 "test_02270_2.csv"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE FUNCTION file('test_02270_1.csv.gz') SELECT 'Hello', 'World'"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE FUNCTION file('test_02270_2.csv.gz') SELECT 'Error'"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM file('test_02270*.csv.gz', 'CSV', 'a String, b String')" 2>&1 | grep -o -m1 "test_02270_2.csv.gz"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM file('test_02270*.csv.gz', 'CSV', 'a String, b String')" --input_format_parallel_parsing 0 2>&1 | grep -o -m1 "test_02270_2.csv.gz"
rm "${CLICKHOUSE_TMP}"/test_02270_1.csv
rm "${CLICKHOUSE_TMP}"/test_02270_2.csv
rm "${user_files_path}"/test_02270_1.csv
rm "${user_files_path}"/test_02270_2.csv
rm "${user_files_path}"/test_02270_1.csv.gz
rm "${user_files_path}"/test_02270_2.csv.gz

View File

@ -0,0 +1,2 @@
http://localhost:11111/test/tsv_with_header.tsv
test/tsv_with_header.tsv

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query "SELECT * FROM url('http://localhost:11111/test/{a,tsv_with_header}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64')" 2>&1 | grep -o -m1 "http://localhost:11111/test/tsv_with_header.tsv"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM s3('http://localhost:11111/test/{a,tsv_with_header}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64')" 2>&1 | grep -o -m1 "test/tsv_with_header.tsv"