diff --git a/CHANGES.md b/CHANGES.md
index cbaea3e2..ef6b24dc 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,11 @@
== Changelog
-
+7.0
+ * DurableElasticsearchSink is rewritten to use the same base code as the sink for Serilog.Sinks.Seq. Nuget Serilog.Sinks.File is now used instead of deprecated Serilog.Sinks.RollingFile. Lots of new fintuning options for file storage is added in ElasticsearchSinkOptions. Updated Serilog.Sinks.Elasticsearch.Sample.Main with SetupLoggerWithPersistantStorage with all available options for durable mode.
+ * Changed datatype on singleEventSizePostingLimit from int to long? with default value null. to make it possible ro reuse code from Sinks.Seq .
+ * IndexDecider didnt worked well in buffer mode because of LogEvent was null. Added BufferIndexDecider.
+ * Added BufferCleanPayload and an example which makes it possible to cleanup your invalid logging document if rejected from elastic because of inconsistent datatype on a field. It'seasy to miss errors in the self log now its possible to se logrows which is bad for elasticsearch in the elastic log.
+ * Added BufferRetainedInvalidPayloadsLimitBytes A soft limit for the number of bytes to use for storing failed requests.
+ * Added BufferFileCountLimit The maximum number of log files that will be retained.
6.4
* Render message by default (#160).
* Expose interface-typed options via appsettings (#162)
diff --git a/README.md b/README.md
index 18fa5061..c61fd4bd 100644
--- a/README.md
+++ b/README.md
@@ -49,6 +49,8 @@ This example shows the options that are currently available when using the appSe
+
+
@@ -147,6 +149,8 @@ In your `appsettings.json` file, under the `Serilog` node, :
"bufferBaseFilename": "C:/Temp/LogDigipolis/docker-elk-serilog-web-buffer",
"bufferFileSizeLimitBytes": 5242880,
"bufferLogShippingInterval": 5000,
+ "bufferRetainedInvalidPayloadsLimitBytes": 5000,
+ "bufferFileCountLimit": 31,
"connectionGlobalHeaders" :"Authorization=Bearer SOME-TOKEN;OtherHeader=OTHER-HEADER-VALUE",
"connectionTimeout": 5,
"emitEventFailure": "WriteToSelfLog",
@@ -203,7 +207,37 @@ Since version 5.5 you can use the RegisterTemplateFailure option. Set it to one
- IndexToDeadletterIndex; using the deadletterindex format, it will write the events to the deadletter queue. When you fix your template mapping, you can copy your data into the right index.
- FailSink; this will simply fail the sink by raising an exception.
+ Since version 7 you can specify an action to do when log row was denied by the elasticsearch because of the data (payload) if durable file is specied.
+ i.e.
+```csharp
+BufferCleanPayload = (failingEvent, statuscode, exception) =>
+ {
+ dynamic e = JObject.Parse(failingEvent);
+ return JsonConvert.SerializeObject(new Dictionary()
+ {
+ { "@timestamp",e["@timestamp"]},
+ { "level","Error"},
+ { "message","Error: "+e.message},
+ { "messageTemplate",e.messageTemplate},
+ { "failingStatusCode", statuscode},
+ { "failingException", exception}
+ });
+ },
+
+```
+The IndexDecider didnt worked well when durable file was specified so an option to specify BufferIndexDecider is added.
+Datatype of logEvent is string
+i.e.
+```csharp
+ BufferIndexDecider = (logEvent, offset) => "log-serilog-" + (new Random().Next(0, 2)),
+```
+Option BufferFileCountLimit is added. The maximum number of log files that will be retained. including the current log file. For unlimited retention, pass null. The default is 31.
+Option BufferFileSizeLimitBytes is added The maximum size, in bytes, to which the buffer log file for a specific date will be allowed to grow. By default 100L * 1024 * 1024 will be applied.
+### Breaking changes for version 7
+Nuget Serilog.Sinks.File is now used instead of deprecated Serilog.Sinks.RollingFile
+SingleEventSizePostingLimit option is changed from int to long? with default value null, Don't use value 0 nothing will be logged then!!!!!
+
### Breaking changes for version 6
Starting from version 6, the sink has been upgraded to work with Elasticsearch 6.0 and has support for the new templates used by ES 6.
diff --git a/sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs b/sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs
index 6742883d..2681091e 100644
--- a/sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs
+++ b/sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs
@@ -1,36 +1,57 @@
using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Reflection.Metadata.Ecma335;
+using System.Threading;
+using Microsoft.Extensions.Configuration;
+using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
using Serilog;
+using Serilog.Core;
using Serilog.Debugging;
+using Serilog.Events;
using Serilog.Formatting.Json;
-using Serilog.Sinks.RollingFile;
+using Serilog.Sinks.File;
using Serilog.Sinks.SystemConsole.Themes;
namespace Serilog.Sinks.Elasticsearch.Sample
{
class Program
{
+ private static IConfiguration Configuration { get; } = new ConfigurationBuilder()
+ .SetBasePath(Directory.GetCurrentDirectory())
+ .AddJsonFile("appsettings.json", true, true)
+ .AddEnvironmentVariables()
+ .Build();
static void Main(string[] args)
{
+
+ // Enable the selflog output
+ SelfLog.Enable(Console.Error);
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.WriteTo.Console(theme: SystemConsoleTheme.Literate)
- .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri("http://elastic:changeme@localhost:9200")) // for the docker-compose implementation
- {
- AutoRegisterTemplate = true,
- //BufferBaseFilename = "./buffer",
- RegisterTemplateFailure = RegisterTemplateRecovery.IndexAnyway,
- FailureCallback = e => Console.WriteLine("Unable to submit event " + e.MessageTemplate),
- EmitEventFailure = EmitEventFailureHandling.WriteToSelfLog |
- EmitEventFailureHandling.WriteToFailureSink |
- EmitEventFailureHandling.RaiseCallback,
- FailureSink = new RollingFileSink("./fail-{Date}.txt", new JsonFormatter(), null, null)
- })
- .CreateLogger();
+ //not persistant
+ .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri(Configuration.GetConnectionString("elasticsearch"))) // for the docker-compose implementation
+ {
+ AutoRegisterTemplate = true,
+ //BufferBaseFilename = "./buffer",
+ RegisterTemplateFailure = RegisterTemplateRecovery.IndexAnyway,
+ FailureCallback = e => Console.WriteLine("Unable to submit event " + e.MessageTemplate),
+ EmitEventFailure = EmitEventFailureHandling.WriteToSelfLog |
+ EmitEventFailureHandling.WriteToFailureSink |
+ EmitEventFailureHandling.RaiseCallback,
+ FailureSink = new FileSink("./fail-{Date}.txt", new JsonFormatter(), null, null)
+ })
+ .CreateLogger();
- // Enable the selflog output
- SelfLog.Enable(Console.Error);
+ //SetupLoggerWithSimplePersistantStorage();
+ //LoggingLevelSwitch levelSwitch = SetupLoggerWithPersistantStorage();
+ //Log.Debug("To high loglevel default is Information this will not be logged");
+ //levelSwitch.MinimumLevel = LogEventLevel.Debug;
Log.Information("Hello, world!");
+ //Log.Information("To big log row bigger than SingleEventSizePostingLimit ! {a}", new string('*', 5000));
int a = 10, b = 0;
try
@@ -47,7 +68,70 @@ static void Main(string[] args)
Log.Debug("Reusing {A} by {B}", "string", true);
Log.CloseAndFlush();
- Console.Read();
+ Console.WriteLine("Press any key to continue...");
+ while (!Console.KeyAvailable)
+ {
+ Thread.Sleep(500);
+ }
+ }
+
+ private static LoggingLevelSwitch SetupLoggerWithPersistantStorage()
+ {
+ //persistant storage with all settings available for this mode
+ //please note that all limit settings here is set verry low for test, default values should usually work best!
+ var levelSwitch = new LoggingLevelSwitch();
+ Log.Logger = new LoggerConfiguration()
+ .MinimumLevel.Debug()
+ .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri(Configuration.GetConnectionString("elasticsearch")))
+ {
+ AutoRegisterTemplate = true,
+ BufferBaseFilename = "./buffer/logserilog",
+ IndexFormat = "log-serilog-{0:yyyy.MM}",
+ RegisterTemplateFailure = RegisterTemplateRecovery.FailSink,
+ BufferCleanPayload = (failingEvent, statuscode, exception) =>
+ {
+ dynamic e = JObject.Parse(failingEvent);
+ return JsonConvert.SerializeObject(new Dictionary()
+ {
+ { "@timestamp",e["@timestamp"]},
+ { "level",e.level},
+ { "message","Error: "+e.message},
+ { "messageTemplate",e.messageTemplate},
+ { "failingStatusCode", statuscode},
+ { "failingException", exception}
+ });
+ },
+ OverwriteTemplate = true,
+ NumberOfShards = 1,
+ NumberOfReplicas = 1,
+ GetTemplateContent = null,
+ AutoRegisterTemplateVersion = AutoRegisterTemplateVersion.ESv6,
+ PipelineName = null,
+ TypeName = "logevent",
+ BufferIndexDecider = (logEvent, offset) => "log-serilog-" + (new Random().Next(0, 2)),
+ BatchPostingLimit = 50,
+ BufferLogShippingInterval = TimeSpan.FromSeconds(5),
+ SingleEventSizePostingLimit = 1000,
+ LevelSwitch = levelSwitch,
+ BufferRetainedInvalidPayloadsLimitBytes = 2000,
+ BufferFileSizeLimitBytes = 2000,
+ BufferFileCountLimit = 2
+ })
+ .CreateLogger();
+ return levelSwitch;
+ }
+
+ private static void SetupLoggerWithSimplePersistantStorage()
+ {
+ //presistant
+ Log.Logger = new LoggerConfiguration()
+ .MinimumLevel.Debug()
+ .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri(Configuration.GetConnectionString("elasticsearch")))
+ {
+ BufferBaseFilename = "./buffer/logserilogsimple",
+ IndexFormat = "log-serilog-simple-{0:yyyy.MM}"
+ })
+ .CreateLogger();
}
}
}
diff --git a/sample/Serilog.Sinks.Elasticsearch.Sample/Serilog.Sinks.Elasticsearch.Sample.csproj b/sample/Serilog.Sinks.Elasticsearch.Sample/Serilog.Sinks.Elasticsearch.Sample.csproj
index eb530de1..970e4d9b 100644
--- a/sample/Serilog.Sinks.Elasticsearch.Sample/Serilog.Sinks.Elasticsearch.Sample.csproj
+++ b/sample/Serilog.Sinks.Elasticsearch.Sample/Serilog.Sinks.Elasticsearch.Sample.csproj
@@ -1,11 +1,15 @@
-
+Exe
- netcoreapp1.1
+ netcoreapp2.1
+
+
+
+
@@ -14,4 +18,10 @@
+
+
+ Always
+
+
+
diff --git a/sample/Serilog.Sinks.Elasticsearch.Sample/appsettings.json b/sample/Serilog.Sinks.Elasticsearch.Sample/appsettings.json
new file mode 100644
index 00000000..f3586043
--- /dev/null
+++ b/sample/Serilog.Sinks.Elasticsearch.Sample/appsettings.json
@@ -0,0 +1,5 @@
+{
+ "ConnectionStrings": {
+ "elasticsearch": "http://elastic:changeme@localhost:9200"
+ }
+}
diff --git a/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs b/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs
index 471d2d27..fb0b4250 100644
--- a/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs
+++ b/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs
@@ -24,6 +24,7 @@
using System.ComponentModel;
using Elasticsearch.Net;
using Serilog.Formatting;
+using Serilog.Sinks.Elasticsearch.Durable;
namespace Serilog
{
@@ -119,6 +120,7 @@ public static LoggerConfiguration Elasticsearch(
/// A switch allowing the pass-through minimum level to be changed at runtime.
///
///
+ ///
///
/// A comma or semi column separated list of key value pairs of headers to be added to each elastic http request
/// The connection timeout (in seconds) when sending bulk operations to elasticsearch (defaults to 5).
@@ -139,7 +141,7 @@ public static LoggerConfiguration Elasticsearch(
/// Customizes the formatter used when converting log events into ElasticSearch documents. Please note that the formatter output must be valid JSON :)
/// Customizes the formatter used when converting log events into the durable sink. Please note that the formatter output must be valid JSON :)
/// Sink to use when Elasticsearch is unable to accept the events. This is optionally and depends on the EmitEventFailure setting.
- /// The maximum length of an event allowed to be posted to Elasticsearch.
+ /// The maximum length of an event allowed to be posted to Elasticsearch.default null
/// LoggerConfiguration object
/// is .
public static LoggerConfiguration Elasticsearch(
@@ -153,7 +155,7 @@ public static LoggerConfiguration Elasticsearch(
bool inlineFields = false,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum,
string bufferBaseFilename = null,
- long? bufferFileSizeLimitBytes = null,
+ long? bufferFileSizeLimitBytes = null,
long bufferLogShippingInterval = 5000,
string connectionGlobalHeaders = null,
LoggingLevelSwitch levelSwitch = null,
@@ -175,7 +177,8 @@ public static LoggerConfiguration Elasticsearch(
ITextFormatter customFormatter = null,
ITextFormatter customDurableFormatter = null,
ILogEventSink failureSink = null,
- int singleEventSizePostingLimit = 0)
+ long? singleEventSizePostingLimit = null,
+ int? bufferFileCountLimit = null)
{
if (string.IsNullOrEmpty(nodeUris))
throw new ArgumentNullException(nameof(nodeUris), "No Elasticsearch node(s) specified.");
@@ -220,6 +223,10 @@ public static LoggerConfiguration Elasticsearch(
options.BufferFileSizeLimitBytes = bufferFileSizeLimitBytes.Value;
}
+ if (bufferFileCountLimit.HasValue)
+ {
+ options.BufferFileCountLimit = bufferFileCountLimit.Value;
+ }
options.BufferLogShippingInterval = TimeSpan.FromMilliseconds(bufferLogShippingInterval);
if (!string.IsNullOrWhiteSpace(connectionGlobalHeaders))
diff --git a/src/Serilog.Sinks.Elasticsearch/Serilog.Sinks.Elasticsearch.csproj b/src/Serilog.Sinks.Elasticsearch/Serilog.Sinks.Elasticsearch.csproj
index d3418bbb..2e01dd0f 100644
--- a/src/Serilog.Sinks.Elasticsearch/Serilog.Sinks.Elasticsearch.csproj
+++ b/src/Serilog.Sinks.Elasticsearch/Serilog.Sinks.Elasticsearch.csproj
@@ -28,12 +28,16 @@
false
+
+
+
+
+
-
@@ -41,10 +45,26 @@
+
+ 1591;1701;1702
+ $(DefineConstants);DURABLE;THREADING_TIMER
+
+
+
+ 1591;1701;1702
+ $(DefineConstants);DURABLE;THREADING_TIMER;HRESULTS
+
+
+ 1591;1701;1702$(DefineConstants);DOTNETCORE;NO_SERIALIZATION;NO_TIMER
+
+ 1591;1701;1702
+ NU1605
+
+
diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/CrossPlatform/PortableTimer.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/CrossPlatform/PortableTimer.cs
deleted file mode 100644
index 2a71c893..00000000
--- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/CrossPlatform/PortableTimer.cs
+++ /dev/null
@@ -1,113 +0,0 @@
-// Copyright 2013-2016 Serilog Contributors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#if NO_TIMER
-
-using Serilog.Debugging;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Serilog.Sinks.Elasticsearch.CrossPlatform
-{
- class PortableTimer : IDisposable
- {
- enum PortableTimerState
- {
- NotWaiting,
- Waiting,
- Active,
- Disposed
- }
-
- readonly object _stateLock = new object();
- PortableTimerState _state = PortableTimerState.NotWaiting;
-
- readonly Action _onTick;
- readonly CancellationTokenSource _cancel = new CancellationTokenSource();
-
- public PortableTimer(Action onTick)
- {
- if (onTick == null) throw new ArgumentNullException(nameof(onTick));
- _onTick = onTick;
- }
-
- public async void Start(TimeSpan interval)
- {
- if (interval < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(interval));
-
- lock (_stateLock)
- {
- if (_state == PortableTimerState.Disposed)
- throw new ObjectDisposedException("PortableTimer");
-
- // There's a little bit of raciness here, but it's needed to support the
- // current API, which allows the tick handler to reenter and set the next interval.
-
- if (_state == PortableTimerState.Waiting)
- throw new InvalidOperationException("The timer is already set.");
-
- if (_cancel.IsCancellationRequested) return;
-
- _state = PortableTimerState.Waiting;
- }
-
- try
- {
- if (interval > TimeSpan.Zero)
- await Task.Delay(interval, _cancel.Token).ConfigureAwait(false);
-
- _state = PortableTimerState.Active;
-
- if (!_cancel.Token.IsCancellationRequested)
- {
- _onTick(_cancel.Token);
- }
- }
- catch (TaskCanceledException tcx)
- {
- SelfLog.WriteLine("The timer was canceled during invocation: {0}", tcx);
- }
- finally
- {
- lock (_stateLock)
- _state = PortableTimerState.NotWaiting;
- }
- }
-
- public void Dispose()
- {
- _cancel.Cancel();
-
- while (true)
- {
- lock (_stateLock)
- {
- if (_state == PortableTimerState.Disposed ||
- _state == PortableTimerState.NotWaiting)
- {
- _state = PortableTimerState.Disposed;
- return;
- }
- }
-
-// On the very old platforms, we've got no option but to spin here.
-#if THREAD
- Thread.Sleep(10);
-#endif
- }
- }
- }
-}
-#endif
diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/APayloadReader.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/APayloadReader.cs
new file mode 100644
index 00000000..0a0af421
--- /dev/null
+++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/APayloadReader.cs
@@ -0,0 +1,120 @@
+// Serilog.Sinks.Seq Copyright 2017 Serilog Contributors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#if DURABLE
+
+using System;
+using System.IO;
+using System.Text;
+using Serilog.Debugging;
+
+namespace Serilog.Sinks.Elasticsearch.Durable
+{
+ ///
+ /// Abstract payload reader
+ /// Generic version of https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/Durable/PayloadReader.cs
+ ///
+ ///
+ public abstract class APayloadReader : IPayloadReader
+ {
+ ///
+ ///
+ ///
+ ///
+ public abstract TPayload GetNoPayload();
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public TPayload ReadPayload(int batchPostingLimit, long? eventBodyLimitBytes, ref FileSetPosition position, ref int count,string fileName)
+ {
+ InitPayLoad(fileName);
+
+ using (var current = System.IO.File.Open(position.File, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
+ {
+ var nextLineStart = position.NextLineStart;
+ while (count < batchPostingLimit && TryReadLine(current, ref nextLineStart, out var nextLine))
+ {
+ position = new FileSetPosition(nextLineStart, position.File);
+
+ // Count is the indicator that work was done, so advances even in the (rare) case an
+ // oversized event is dropped.
+ ++count;
+
+ if (eventBodyLimitBytes.HasValue && Encoding.UTF8.GetByteCount(nextLine) > eventBodyLimitBytes.Value)
+ {
+ SelfLog.WriteLine(
+ "Event JSON representation exceeds the byte size limit of {0} and will be dropped; data: {1}",
+ eventBodyLimitBytes, nextLine);
+ }
+ else
+ {
+ AddToPayLoad(nextLine);
+ }
+ }
+ }
+ return FinishPayLoad();
+ }
+ ///
+ ///
+ ///
+ ///
+ protected abstract void InitPayLoad(string fileName);
+ ///
+ ///
+ ///
+ ///
+ protected abstract TPayload FinishPayLoad();
+ ///
+ ///
+ ///
+ ///
+ protected abstract void AddToPayLoad(string nextLine);
+
+ // It would be ideal to chomp whitespace here, but not required.
+ private static bool TryReadLine(Stream current, ref long nextStart, out string nextLine)
+ {
+ var includesBom = nextStart == 0;
+
+ if (current.Length <= nextStart)
+ {
+ nextLine = null;
+ return false;
+ }
+
+ current.Position = nextStart;
+
+ // Important not to dispose this StreamReader as the stream must remain open.
+ var reader = new StreamReader(current, Encoding.UTF8, false, 128);
+ nextLine = reader.ReadLine();
+
+ if (nextLine == null)
+ return false;
+
+ nextStart += Encoding.UTF8.GetByteCount(nextLine) + Encoding.UTF8.GetByteCount(Environment.NewLine);
+ if (includesBom)
+ nextStart += 3;
+
+ return true;
+ }
+ }
+}
+
+#endif
diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/BookmarkFile.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/BookmarkFile.cs
new file mode 100644
index 00000000..9d35c192
--- /dev/null
+++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/BookmarkFile.cs
@@ -0,0 +1,80 @@
+// Serilog.Sinks.Seq Copyright 2017 Serilog Contributors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#if DURABLE
+
+using System;
+using System.IO;
+using System.Text;
+
+namespace Serilog.Sinks.Elasticsearch.Durable
+{
+ ///
+ /// https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/Durable/BookmarkFile.cs
+ ///
+ sealed class BookmarkFile : IDisposable
+ {
+ readonly FileStream _bookmark;
+
+ public BookmarkFile(string bookmarkFilename)
+ {
+ _bookmark = System.IO.File.Open(bookmarkFilename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
+ }
+
+ public FileSetPosition TryReadBookmark()
+ {
+ if (_bookmark.Length != 0)
+ {
+ _bookmark.Position = 0;
+
+ // Important not to dispose this StreamReader as the stream must remain open.
+ var reader = new StreamReader(_bookmark, Encoding.UTF8, false, 128);
+ var current = reader.ReadLine();
+
+ if (current != null)
+ {
+ var parts = current.Split(new[] { ":::" }, StringSplitOptions.RemoveEmptyEntries);
+ if (parts.Length == 2)
+ {
+ return new FileSetPosition(long.Parse(parts[0]), parts[1]);
+ }
+ }
+ }
+
+ return FileSetPosition.None;
+ }
+
+ public void WriteBookmark(FileSetPosition bookmark)
+ {
+ if (bookmark.File == null)
+ return;
+
+ // Don't need to truncate, since we only ever read a single line and
+ // writes are always newline-terminated
+ _bookmark.Position = 0;
+
+ // Cannot dispose, as `leaveOpen` is not available on all target platforms
+ var writer = new StreamWriter(_bookmark);
+ writer.WriteLine("{0}:::{1}", bookmark.NextLineStart, bookmark.File);
+ writer.Flush();
+ }
+
+ public void Dispose()
+ {
+ _bookmark.Dispose();
+ }
+ }
+}
+
+#endif
diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ControlledLevelSwitch.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ControlledLevelSwitch.cs
new file mode 100644
index 00000000..5e6321c3
--- /dev/null
+++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ControlledLevelSwitch.cs
@@ -0,0 +1,73 @@
+// Serilog.Sinks.Seq Copyright 2016 Serilog Contributors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Serilog.Core;
+using Serilog.Events;
+
+namespace Serilog.Sinks.Elasticsearch.Durable
+{
+ ///
+ /// Instances of this type are single-threaded, generally only updated on a background
+ /// timer thread. An exception is , which may be called
+ /// concurrently but performs no synchronization.
+ /// https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/ControlledLevelSwitch.cs
+ ///
+ class ControlledLevelSwitch
+ {
+ // If non-null, then background level checks will be performed; set either through the constructor
+ // or in response to a level specification from the server. Never set to null after being made non-null.
+ LoggingLevelSwitch _controlledSwitch;
+ LogEventLevel? _originalLevel;
+
+ public ControlledLevelSwitch(LoggingLevelSwitch controlledSwitch = null)
+ {
+ _controlledSwitch = controlledSwitch;
+ }
+
+ public bool IsActive => _controlledSwitch != null;
+
+ public bool IsIncluded(LogEvent evt)
+ {
+ // Concurrent, but not synchronized.
+ var controlledSwitch = _controlledSwitch;
+ return controlledSwitch == null ||
+ (int)controlledSwitch.MinimumLevel <= (int)evt.Level;
+ }
+
+ public void Update(LogEventLevel? minimumAcceptedLevel)
+ {
+ if (minimumAcceptedLevel == null)
+ {
+ if (_controlledSwitch != null && _originalLevel.HasValue)
+ _controlledSwitch.MinimumLevel = _originalLevel.Value;
+
+ return;
+ }
+
+ if (_controlledSwitch == null)
+ {
+ // The server is controlling the logging level, but not the overall logger. Hence, if the server
+ // stops controlling the level, the switch should become transparent.
+ _originalLevel = LevelAlias.Minimum;
+ _controlledSwitch = new LoggingLevelSwitch(minimumAcceptedLevel.Value);
+ return;
+ }
+
+ if (!_originalLevel.HasValue)
+ _originalLevel = _controlledSwitch.MinimumLevel;
+
+ _controlledSwitch.MinimumLevel = minimumAcceptedLevel.Value;
+ }
+ }
+}
diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticSearchSink.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticSearchSink.cs
new file mode 100644
index 00000000..9ac18737
--- /dev/null
+++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticSearchSink.cs
@@ -0,0 +1,98 @@
+// Copyright 2014 Serilog Contributors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Elasticsearch.Net;
+using Serilog.Core;
+using Serilog.Events;
+
+
+namespace Serilog.Sinks.Elasticsearch.Durable
+{
+ class DurableElasticsearchSink : ILogEventSink, IDisposable
+ {
+ // we rely on the date in the filename later!
+ const string FileNameSuffix = "-.json";
+
+ readonly Logger _sink;
+ readonly LogShipper> _shipper;
+ readonly ElasticsearchSinkState _state;
+
+ public DurableElasticsearchSink(ElasticsearchSinkOptions options)
+ {
+ _state = ElasticsearchSinkState.Create(options);
+
+ if (string.IsNullOrWhiteSpace(options.BufferBaseFilename))
+ {
+ throw new ArgumentException("Cannot create the durable ElasticSearch sink without a buffer base file name!");
+ }
+
+
+ _sink = new LoggerConfiguration()
+ .MinimumLevel.Verbose()
+ .WriteTo.File(_state.DurableFormatter,
+ options.BufferBaseFilename + FileNameSuffix,
+ rollingInterval: RollingInterval.Day,
+ fileSizeLimitBytes: options.BufferFileSizeLimitBytes,
+ rollOnFileSizeLimit: true,
+ retainedFileCountLimit: options.BufferFileCountLimit,
+ levelSwitch: _state.Options.LevelSwitch,
+ encoding: Encoding.UTF8)
+ .CreateLogger();
+
+
+ var elasticSearchLogClient = new ElasticsearchLogClient(
+ elasticLowLevelClient: _state.Client,
+ cleanPayload: _state.Options.BufferCleanPayload);
+
+ var payloadReader = new ElasticsearchPayloadReader(
+ pipelineName: _state.Options.PipelineName,
+ typeName:_state.Options.TypeName,
+ serialize:_state.Serialize,
+ getIndexForEvent: _state.GetBufferedIndexForEvent
+ );
+
+ _shipper = new ElasticsearchLogShipper(
+ bufferBaseFilename: _state.Options.BufferBaseFilename,
+ batchPostingLimit: _state.Options.BatchPostingLimit,
+ period: _state.Options.BufferLogShippingInterval ?? TimeSpan.FromSeconds(5),
+ eventBodyLimitBytes: _state.Options.SingleEventSizePostingLimit,
+ levelControlSwitch: _state.Options.LevelSwitch,
+ logClient: elasticSearchLogClient,
+ payloadReader: payloadReader,
+ retainedInvalidPayloadsLimitBytes: _state.Options.BufferRetainedInvalidPayloadsLimitBytes,
+ bufferSizeLimitBytes: _state.Options.BufferFileSizeLimitBytes,
+ registerTemplateIfNeeded: _state.RegisterTemplateIfNeeded);
+
+ }
+
+ public void Emit(LogEvent logEvent)
+ {
+ // This is a lagging indicator, but the network bandwidth usage benefits
+ // are worth the ambiguity.
+ if (_shipper.IsIncluded(logEvent))
+ {
+ _sink.Write(logEvent);
+ }
+ }
+
+ public void Dispose()
+ {
+ _sink.Dispose();
+ _shipper.Dispose();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticsearchSink.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticsearchSink.cs
new file mode 100644
index 00000000..9ac18737
--- /dev/null
+++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticsearchSink.cs
@@ -0,0 +1,98 @@
+// Copyright 2014 Serilog Contributors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Elasticsearch.Net;
+using Serilog.Core;
+using Serilog.Events;
+
+
+namespace Serilog.Sinks.Elasticsearch.Durable
+{
+ class DurableElasticsearchSink : ILogEventSink, IDisposable
+ {
+ // we rely on the date in the filename later!
+ const string FileNameSuffix = "-.json";
+
+ readonly Logger _sink;
+ readonly LogShipper> _shipper;
+ readonly ElasticsearchSinkState _state;
+
+ public DurableElasticsearchSink(ElasticsearchSinkOptions options)
+ {
+ _state = ElasticsearchSinkState.Create(options);
+
+ if (string.IsNullOrWhiteSpace(options.BufferBaseFilename))
+ {
+ throw new ArgumentException("Cannot create the durable ElasticSearch sink without a buffer base file name!");
+ }
+
+
+ _sink = new LoggerConfiguration()
+ .MinimumLevel.Verbose()
+ .WriteTo.File(_state.DurableFormatter,
+ options.BufferBaseFilename + FileNameSuffix,
+ rollingInterval: RollingInterval.Day,
+ fileSizeLimitBytes: options.BufferFileSizeLimitBytes,
+ rollOnFileSizeLimit: true,
+ retainedFileCountLimit: options.BufferFileCountLimit,
+ levelSwitch: _state.Options.LevelSwitch,
+ encoding: Encoding.UTF8)
+ .CreateLogger();
+
+
+ var elasticSearchLogClient = new ElasticsearchLogClient(
+ elasticLowLevelClient: _state.Client,
+ cleanPayload: _state.Options.BufferCleanPayload);
+
+ var payloadReader = new ElasticsearchPayloadReader(
+ pipelineName: _state.Options.PipelineName,
+ typeName:_state.Options.TypeName,
+ serialize:_state.Serialize,
+ getIndexForEvent: _state.GetBufferedIndexForEvent
+ );
+
+ _shipper = new ElasticsearchLogShipper(
+ bufferBaseFilename: _state.Options.BufferBaseFilename,
+ batchPostingLimit: _state.Options.BatchPostingLimit,
+ period: _state.Options.BufferLogShippingInterval ?? TimeSpan.FromSeconds(5),
+ eventBodyLimitBytes: _state.Options.SingleEventSizePostingLimit,
+ levelControlSwitch: _state.Options.LevelSwitch,
+ logClient: elasticSearchLogClient,
+ payloadReader: payloadReader,
+ retainedInvalidPayloadsLimitBytes: _state.Options.BufferRetainedInvalidPayloadsLimitBytes,
+ bufferSizeLimitBytes: _state.Options.BufferFileSizeLimitBytes,
+ registerTemplateIfNeeded: _state.RegisterTemplateIfNeeded);
+
+ }
+
+ public void Emit(LogEvent logEvent)
+ {
+ // This is a lagging indicator, but the network bandwidth usage benefits
+ // are worth the ambiguity.
+ if (_shipper.IsIncluded(logEvent))
+ {
+ _sink.Write(logEvent);
+ }
+ }
+
+ public void Dispose()
+ {
+ _sink.Dispose();
+ _shipper.Dispose();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogClient.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogClient.cs
new file mode 100644
index 00000000..6b2d45bf
--- /dev/null
+++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogClient.cs
@@ -0,0 +1,125 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.ExceptionServices;
+using System.Text;
+using System.Text.RegularExpressions;
+using System.Threading.Tasks;
+using Elasticsearch.Net;
+using Serilog.Debugging;
+
+namespace Serilog.Sinks.Elasticsearch.Durable
+{
+ ///
+ ///
+ ///
+ public class ElasticsearchLogClient : ILogClient>
+ {
+ private readonly IElasticLowLevelClient _elasticLowLevelClient;
+ private readonly Func _cleanPayload;
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ public ElasticsearchLogClient(IElasticLowLevelClient elasticLowLevelClient,
+ Func cleanPayload)
+ {
+ _elasticLowLevelClient = elasticLowLevelClient;
+ _cleanPayload = cleanPayload;
+ }
+
+ public async Task SendPayloadAsync(List payload)
+ {
+ return await SendPayloadAsync(payload, true);
+ }
+
+ public async Task SendPayloadAsync(List payload,bool first)
+ {
+ try
+ {
+ if (payload == null || !payload.Any()) return new SentPayloadResult(null, true);
+ var response = await _elasticLowLevelClient.BulkAsync(PostData.MultiJson(payload));
+
+ if (response.Success)
+ {
+ var cleanPayload = new List();
+ var invalidPayload = GetInvalidPayloadAsync(response, payload,out cleanPayload);
+ if ((cleanPayload?.Any() ?? false) && first)
+ {
+ await SendPayloadAsync(cleanPayload,false);
+ }
+
+ return new SentPayloadResult(response, true, invalidPayload);
+ }
+ else
+ {
+ SelfLog.WriteLine("Received failed ElasticSearch shipping result {0}: {1}", response.HttpStatusCode,
+ response.OriginalException);
+ return new SentPayloadResult(response, false,
+ new InvalidResult()
+ {
+ StatusCode = response.HttpStatusCode ?? 500,
+ Content = response.OriginalException.ToString()
+ });
+ }
+ }
+ catch (Exception ex)
+ {
+ SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex);
+ return new SentPayloadResult(null, false, null, ex);
+ }
+
+
+ }
+
+ private InvalidResult GetInvalidPayloadAsync(DynamicResponse baseResult, List payload, out List cleanPayload)
+ {
+ int i = 0;
+ cleanPayload = new List();
+ var items = baseResult.Body["items"];
+ if (items == null) return null;
+ List badPayload = new List();
+
+ bool hasErrors = false;
+ foreach (dynamic item in items)
+ {
+ long? status = item.index?.status;
+ i++;
+ if (!status.HasValue || status < 300)
+ {
+ continue;
+ }
+
+ hasErrors = true;
+ var id = item.index?._id;
+ var error = item.index?.error;
+ if (int.TryParse(id.Split('_')[0], out int index))
+ {
+ SelfLog.WriteLine("Received failed ElasticSearch shipping result {0}: {1}. Failed payload : {2}.", status, error?.ToString(), payload.ElementAt(index * 2 + 1));
+ badPayload.Add(payload.ElementAt(index * 2));
+ badPayload.Add(payload.ElementAt(index * 2 + 1));
+ if (_cleanPayload != null)
+ {
+ cleanPayload.Add(payload.ElementAt(index * 2));
+ cleanPayload.Add(_cleanPayload(payload.ElementAt(index * 2 + 1), status, error?.ToString()));
+ }
+ }
+ else
+ {
+ SelfLog.WriteLine($"Received failed ElasticSearch shipping result {status}: {error?.ToString()}.");
+ }
+ }
+
+ if (!hasErrors)
+ return null;
+ return new InvalidResult()
+ {
+ StatusCode = baseResult.HttpStatusCode ?? 500,
+ Content = baseResult.ToString(),
+ BadPayLoad = String.Join(Environment.NewLine, badPayload)
+ };
+ }
+ }
+}
diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogShipper.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogShipper.cs
new file mode 100644
index 00000000..6a6b8b26
--- /dev/null
+++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogShipper.cs
@@ -0,0 +1,71 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Serilog.Core;
+using Serilog.Debugging;
+using Serilog.Sinks.Elasticsearch.Durable;
+
+namespace Serilog.Sinks.Elasticsearch.Durable
+{
+ ///
+ ///
+ ///
+ public class ElasticsearchLogShipper : LogShipper>
+ {
+ private readonly Action _registerTemplateIfNeeded;
+ bool _didRegisterTemplateIfNeeded = false;
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public ElasticsearchLogShipper(string bufferBaseFilename, int batchPostingLimit, TimeSpan period,
+ long? eventBodyLimitBytes, LoggingLevelSwitch levelControlSwitch, ILogClient> logClient,
+ IPayloadReader> payloadReader, long? retainedInvalidPayloadsLimitBytes,
+ long? bufferSizeLimitBytes, Action registerTemplateIfNeeded)
+ : base(bufferBaseFilename, batchPostingLimit, period, eventBodyLimitBytes,
+ levelControlSwitch, logClient, payloadReader, retainedInvalidPayloadsLimitBytes, bufferSizeLimitBytes)
+ {
+ _registerTemplateIfNeeded = registerTemplateIfNeeded;
+ }
+
+ ///
+ ///
+ ///
+ ///
+ protected override async Task OnTick()
+ {
+ bool success = true;
+ try
+ {
+ if (!_didRegisterTemplateIfNeeded)
+ {
+ if (_registerTemplateIfNeeded != null)
+ {
+ _registerTemplateIfNeeded();
+ _didRegisterTemplateIfNeeded = true;
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex);
+ _connectionSchedule.MarkFailure();
+ success = false;
+ }
+ if (success)
+ await base.OnTick();
+ }
+ }
+}
diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchPayloadReader.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchPayloadReader.cs
new file mode 100644
index 00000000..a31b253e
--- /dev/null
+++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchPayloadReader.cs
@@ -0,0 +1,100 @@
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Serilog.Sinks.Elasticsearch.Durable
+{
+ ///
+ ///
+ ///
+ public class ElasticsearchPayloadReader: APayloadReader>
+ {
+ private readonly string _pipelineName;
+ private readonly string _typeName;
+ private readonly Func