Skip to content
This repository was archived by the owner on Jun 1, 2024. It is now read-only.

Make sure the sink (and app) do not crash when the ES is unreachable … #359

Merged
merged 2 commits into from
Sep 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .ionide/symbolCache.db
Binary file not shown.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## Changelog

8.3
* Do not crash when ES is unreachable and the option `DetectElasticsearchVersion` is set to true.

* Disable dot-escaping for field names, because ELK already supports dots in field names.
* Support for explicitly setting `Options.TypeName` to `null` this will remove the
deprecated `_type` from the bulk payload being sent to Elastic. Earlier an exception was
Expand Down
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"sdk": {
"version": "2.1.500"
"version": "2.1.807"
}
}
2 changes: 1 addition & 1 deletion sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static void Main(string[] args)
NumberOfReplicas = 1,
NumberOfShards = 2,
//BufferBaseFilename = "./buffer",
RegisterTemplateFailure = RegisterTemplateRecovery.FailSink,
// RegisterTemplateFailure = RegisterTemplateRecovery.FailSink,
FailureCallback = e => Console.WriteLine("Unable to submit event " + e.MessageTemplate),
EmitEventFailure = EmitEventFailureHandling.WriteToSelfLog |
EmitEventFailureHandling.WriteToFailureSink |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,25 @@ public void DiscoverClusterVersion()
{
if (!_options.DetectElasticsearchVersion) return;

var response = _client.Cat.Nodes<StringResponse>(new CatNodesRequestParameters()
try
{
Headers = new[] { "v" }
});
if (!response.Success) return;

_discoveredVersion = response.Body.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries)
.FirstOrDefault();
var response = _client.Cat.Nodes<StringResponse>(new CatNodesRequestParameters()
{
Headers = new[] { "v" }
});
if (!response.Success) return;

_discoveredVersion = response.Body.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries)
.FirstOrDefault();

if (_discoveredVersion?.StartsWith("7.") ?? false)
_options.TypeName = "_doc";
if (_discoveredVersion?.StartsWith("7.") ?? false)
_options.TypeName = "_doc";
}
catch (Exception ex)
{
SelfLog.WriteLine("Failed to discover the cluster version. {0}", ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public abstract class ElasticsearchSinkTestsBase
protected readonly ElasticsearchSinkOptions _options;
protected List<string> _seenHttpPosts = new List<string>();
protected List<int> _seenHttpHeads = new List<int>();
protected List<Tuple<Uri, int>> _seenHttpGets = new List<Tuple<Uri, int>>();
protected List<Tuple<Uri, string>> _seenHttpPuts = new List<Tuple<Uri, string>>();
private IElasticsearchSerializer _serializer;

Expand All @@ -32,10 +33,11 @@ protected ElasticsearchSinkTestsBase()
{
_seenHttpPosts = new List<string>();
_seenHttpHeads = new List<int>();
_seenHttpGets = new List<Tuple<Uri,int>>();
_seenHttpPuts = new List<Tuple<Uri, string>>();

var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
_connection = new ConnectionStub(_seenHttpPosts, _seenHttpHeads, _seenHttpPuts, () => _templateExistsReturnCode);
_connection = new ConnectionStub(_seenHttpPosts, _seenHttpHeads, _seenHttpPuts, _seenHttpGets, () => _templateExistsReturnCode);
_serializer = JsonNetSerializer.Default(LowLevelRequestResponseSerializer.Instance, new ConnectionSettings(connectionPool, _connection));

_options = new ElasticsearchSinkOptions(connectionPool)
Expand Down Expand Up @@ -119,19 +121,22 @@ public class ConnectionStub : InMemoryConnection
{
private Func<int> _templateExistReturnCode;
private List<int> _seenHttpHeads;
private List<Tuple<Uri, int>> _seenHttpGets;
private List<string> _seenHttpPosts;
private List<Tuple<Uri, string>> _seenHttpPuts;

public ConnectionStub(
List<string> _seenHttpPosts,
List<int> _seenHttpHeads,
List<Tuple<Uri, string>> _seenHttpPuts,
List<Tuple<Uri, int>> _seenHttpGets,
Func<int> templateExistReturnCode
)
{
this._seenHttpPosts = _seenHttpPosts;
this._seenHttpHeads = _seenHttpHeads;
this._seenHttpPuts = _seenHttpPuts;
this._seenHttpGets = _seenHttpGets;
this._templateExistReturnCode = templateExistReturnCode;
}

Expand All @@ -149,6 +154,9 @@ public override TReturn Request<TReturn>(RequestData requestData)
case HttpMethod.POST:
_seenHttpPosts.Add(Encoding.UTF8.GetString(ms.ToArray()));
break;
case HttpMethod.GET:
_seenHttpGets.Add(Tuple.Create(requestData.Uri, this._templateExistReturnCode()));
break;
case HttpMethod.HEAD:
_seenHttpHeads.Add(this._templateExistReturnCode());
break;
Expand All @@ -172,6 +180,9 @@ public override async Task<TResponse> RequestAsync<TResponse>(RequestData reques
case HttpMethod.POST:
_seenHttpPosts.Add(Encoding.UTF8.GetString(ms.ToArray()));
break;
case HttpMethod.GET:
_seenHttpGets.Add(Tuple.Create(requestData.Uri, this._templateExistReturnCode()));
break;
case HttpMethod.HEAD:
_seenHttpHeads.Add(this._templateExistReturnCode());
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using System.IO;
using System.Text;
using FluentAssertions;
using Xunit;
using Serilog.Debugging;

namespace Serilog.Sinks.Elasticsearch.Tests.Templating
{
[Collection("isolation")]
public class DiscoverVersionHandlesUnavailableServerTests : ElasticsearchSinkTestsBase
{
[Fact]
public void Should_not_crash_when_server_is_unavaiable()
{
// If this crashes, the test will fail
CreateLoggerThatCrashes();
}

[Fact]
public void Should_write_error_to_self_log()
{
var selfLogMessages = new StringBuilder();
SelfLog.Enable(new StringWriter(selfLogMessages));

// Exception occurs on creation - should be logged
CreateLoggerThatCrashes();

var selfLogContents = selfLogMessages.ToString();
selfLogContents.Should().Contain("Failed to discover the cluster version");

}

private static ILogger CreateLoggerThatCrashes()
{
var loggerConfig = new LoggerConfiguration()
.WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri("http://localhost:9199"))
{
DetectElasticsearchVersion = true
});

return loggerConfig.CreateLogger();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using FluentAssertions;
using Xunit;

namespace Serilog.Sinks.Elasticsearch.Tests.Templating
{
public class DiscoverVersionTests : ElasticsearchSinkTestsBase
{
private readonly Tuple<Uri,int> _templateGet;

public DiscoverVersionTests()
{
_options.DetectElasticsearchVersion = true;

var loggerConfig = new LoggerConfiguration()
.MinimumLevel.Debug()
.Enrich.WithMachineName()
.WriteTo.ColoredConsole()
.WriteTo.Elasticsearch(_options);

var logger = loggerConfig.CreateLogger();
using ((IDisposable) logger)
{
logger.Error("Test exception. Should not contain an embedded exception object.");
}

this._seenHttpGets.Should().NotBeNullOrEmpty().And.HaveCount(1);
_templateGet = this._seenHttpGets[0];
}


[Fact]
public void TemplatePutToCorrectUrl()
{
var uri = _templateGet.Item1;
uri.AbsolutePath.Should().Be("/_cat/nodes");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Serilog.Sinks.Elasticsearch.Tests.Templating
public class SendsTemplateHandlesUnavailableServerTests : ElasticsearchSinkTestsBase
{
[Fact]
public void Should_not_crash_when_server_is_unavaiable()
public void Should_not_crash_when_server_is_unavailable()
{
// If this crashes, the test will fail
CreateLoggerThatCrashes();
Expand Down