Fix tests

This commit is contained in:
avogar 2021-11-11 20:27:23 +03:00
parent c521a9131a
commit 51831afff8
23 changed files with 26 additions and 105 deletions

View File

@ -6,7 +6,6 @@
#include "CompressedWriteBuffer.h"
#include <Compression/CompressionFactory.h>
#include <Common/MemoryTracker.h>
namespace DB
@ -35,10 +34,7 @@ void CompressedWriteBuffer::nextImpl()
CompressedWriteBuffer::~CompressedWriteBuffer()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
CompressedWriteBuffer::CompressedWriteBuffer(

View File

@ -4,8 +4,6 @@
# include <IO/BrotliWriteBuffer.h>
# include <brotli/encode.h>
#include <Common/MemoryTracker.h>
namespace DB
{
@ -46,10 +44,7 @@ BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int comp
BrotliWriteBuffer::~BrotliWriteBuffer()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void BrotliWriteBuffer::nextImpl()

View File

@ -47,10 +47,7 @@ Bzip2WriteBuffer::Bzip2WriteBuffer(std::unique_ptr<WriteBuffer> out_, int compre
Bzip2WriteBuffer::~Bzip2WriteBuffer()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void Bzip2WriteBuffer::nextImpl()

View File

@ -1,5 +1,4 @@
#include <IO/LZMADeflatingWriteBuffer.h>
#include <Common/MemoryTracker.h>
namespace DB
{
@ -47,10 +46,7 @@ LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void LZMADeflatingWriteBuffer::nextImpl()

View File

@ -1,6 +1,5 @@
#include <IO/Lz4DeflatingWriteBuffer.h>
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
namespace DB
@ -44,10 +43,7 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
Lz4DeflatingWriteBuffer::~Lz4DeflatingWriteBuffer()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void Lz4DeflatingWriteBuffer::nextImpl()

View File

@ -7,6 +7,7 @@
#include <cassert>
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <IO/BufferBase.h>
@ -16,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
extern const int LOGICAL_ERROR;
}
@ -106,6 +108,8 @@ public:
if (finalized)
return;
/// finalize() is often called from destructors.
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalized = true;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <utility>
#include <memory>

View File

@ -1,7 +1,6 @@
#include <IO/WriteBufferFromEncryptedFile.h>
#if USE_SSL
#include <Common/MemoryTracker.h>
namespace DB
{
@ -22,10 +21,7 @@ WriteBufferFromEncryptedFile::WriteBufferFromEncryptedFile(
WriteBufferFromEncryptedFile::~WriteBufferFromEncryptedFile()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void WriteBufferFromEncryptedFile::finalizeBeforeNestedFinalize()

View File

@ -3,7 +3,6 @@
#include <errno.h>
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
@ -72,10 +71,8 @@ WriteBufferFromFile::WriteBufferFromFile(
WriteBufferFromFile::~WriteBufferFromFile()
{
if (finalized || fd < 0)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
::close(fd);
}
void WriteBufferFromFile::finalizeImpl()
@ -84,8 +81,6 @@ void WriteBufferFromFile::finalizeImpl()
return;
next();
::close(fd);
}

View File

@ -1,14 +1,12 @@
#include <unistd.h>
#include <errno.h>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
@ -96,10 +94,7 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void WriteBufferFromFileDescriptor::finalizeImpl()

View File

@ -52,7 +52,7 @@ protected:
/// If file has name contains filename, otherwise contains string "(fd=...)"
std::string file_name;
private:
void finalizeImpl() override;
};

View File

@ -1,5 +1,4 @@
#include <IO/WriteBufferFromOStream.h>
#include <Common/MemoryTracker.h>
namespace DB
@ -42,10 +41,7 @@ WriteBufferFromOStream::WriteBufferFromOStream(
WriteBufferFromOStream::~WriteBufferFromOStream()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
}

View File

@ -5,7 +5,6 @@
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
@ -83,10 +82,7 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
}

View File

@ -4,7 +4,6 @@
# include <IO/WriteBufferFromS3.h>
# include <IO/WriteHelpers.h>
# include <Common/MemoryTracker.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/CreateMultipartUploadRequest.h>
@ -86,10 +85,7 @@ void WriteBufferFromS3::allocateBuffer()
WriteBufferFromS3::~WriteBufferFromS3()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void WriteBufferFromS3::finalizeImpl()
@ -163,7 +159,7 @@ void WriteBufferFromS3::writePart()
{
auto etag = outcome.GetResult().GetETag();
part_tags.push_back(etag);
LOG_DEBUG(log, "Writing part finalizeImpl()ed. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, etag, part_tags.size());
LOG_DEBUG(log, "Writing part finished. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, etag, part_tags.size());
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);

View File

@ -3,7 +3,6 @@
#include <vector>
#include <IO/WriteBuffer.h>
#include <Common/MemoryTracker.h>
namespace DB
@ -60,10 +59,7 @@ public:
~WriteBufferFromVector() override
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
private:

View File

@ -1,6 +1,5 @@
#include <Poco/UTF8Encoding.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Common/MemoryTracker.h>
#include <base/types.h>
#ifdef __SSE2__
@ -125,10 +124,7 @@ void WriteBufferValidUTF8::nextImpl()
WriteBufferValidUTF8::~WriteBufferValidUTF8()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void WriteBufferValidUTF8::finalizeImpl()

View File

@ -1,5 +1,4 @@
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
@ -78,10 +77,7 @@ void ZlibDeflatingWriteBuffer::nextImpl()
ZlibDeflatingWriteBuffer::~ZlibDeflatingWriteBuffer()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void ZlibDeflatingWriteBuffer::finalizeBeforeNestedFinalize()

View File

@ -1,5 +1,4 @@
#include <IO/ZstdDeflatingAppendableWriteBuffer.h>
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
namespace DB
@ -78,10 +77,7 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer()
{
if (finalized || first_write)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()

View File

@ -1,5 +1,4 @@
#include <IO/ZstdDeflatingWriteBuffer.h>
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
namespace DB
@ -30,10 +29,7 @@ ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void ZstdDeflatingWriteBuffer::nextImpl()

View File

@ -2,7 +2,6 @@
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/ISource.h>
#include <QueryPipeline/QueryPipeline.h>
#include <iostream>
namespace DB

View File

@ -3,14 +3,6 @@
#include <IO/HTTPCommon.h>
#include <IO/Progress.h>
#include <IO/WriteBufferFromString.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
#include <Common/config.h>
#include <Poco/Version.h>
namespace DB
@ -170,10 +162,7 @@ void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress)
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
finalize();
}
void WriteBufferFromHTTPServerResponse::finalizeImpl()

View File

@ -1,7 +1,6 @@
#include <QueryPipeline/RemoteInserter.h>
#include <Formats/NativeReader.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Common/escapeForFileName.h>
#include <Common/CurrentMetrics.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/SipHash.h>

View File

@ -6,7 +6,6 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
#include <IO/WriteBufferFromFile.h>
@ -21,11 +20,9 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/escapeForFileName.h>
@ -35,9 +32,6 @@
#include <base/range.h>
#include <base/scope_guard.h>
#include <future>
#include <condition_variable>
#include <mutex>
#include <filesystem>
@ -741,6 +735,7 @@ void DistributedSink::writeToShard(const Block & block, const std::vector<std::s
stream.write(block);
compress.finalize();
out.finalize();
if (fsync)
out.sync();