diff --git a/CHANGES.md b/CHANGES.md index cbaea3e2..ef6b24dc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,11 @@ == Changelog - +7.0 + * DurableElasticsearchSink is rewritten to use the same base code as the sink for Serilog.Sinks.Seq. Nuget Serilog.Sinks.File is now used instead of deprecated Serilog.Sinks.RollingFile. Lots of new fintuning options for file storage is added in ElasticsearchSinkOptions. Updated Serilog.Sinks.Elasticsearch.Sample.Main with SetupLoggerWithPersistantStorage with all available options for durable mode. + * Changed datatype on singleEventSizePostingLimit from int to long? with default value null. to make it possible ro reuse code from Sinks.Seq . + * IndexDecider didnt worked well in buffer mode because of LogEvent was null. Added BufferIndexDecider. + * Added BufferCleanPayload and an example which makes it possible to cleanup your invalid logging document if rejected from elastic because of inconsistent datatype on a field. It'seasy to miss errors in the self log now its possible to se logrows which is bad for elasticsearch in the elastic log. + * Added BufferRetainedInvalidPayloadsLimitBytes A soft limit for the number of bytes to use for storing failed requests. + * Added BufferFileCountLimit The maximum number of log files that will be retained. 6.4 * Render message by default (#160). * Expose interface-typed options via appsettings (#162) diff --git a/README.md b/README.md index 18fa5061..c61fd4bd 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ This example shows the options that are currently available when using the appSe + + @@ -147,6 +149,8 @@ In your `appsettings.json` file, under the `Serilog` node, : "bufferBaseFilename": "C:/Temp/LogDigipolis/docker-elk-serilog-web-buffer", "bufferFileSizeLimitBytes": 5242880, "bufferLogShippingInterval": 5000, + "bufferRetainedInvalidPayloadsLimitBytes": 5000, + "bufferFileCountLimit": 31, "connectionGlobalHeaders" :"Authorization=Bearer SOME-TOKEN;OtherHeader=OTHER-HEADER-VALUE", "connectionTimeout": 5, "emitEventFailure": "WriteToSelfLog", @@ -203,7 +207,37 @@ Since version 5.5 you can use the RegisterTemplateFailure option. Set it to one - IndexToDeadletterIndex; using the deadletterindex format, it will write the events to the deadletter queue. When you fix your template mapping, you can copy your data into the right index. - FailSink; this will simply fail the sink by raising an exception. + Since version 7 you can specify an action to do when log row was denied by the elasticsearch because of the data (payload) if durable file is specied. + i.e. +```csharp +BufferCleanPayload = (failingEvent, statuscode, exception) => + { + dynamic e = JObject.Parse(failingEvent); + return JsonConvert.SerializeObject(new Dictionary() + { + { "@timestamp",e["@timestamp"]}, + { "level","Error"}, + { "message","Error: "+e.message}, + { "messageTemplate",e.messageTemplate}, + { "failingStatusCode", statuscode}, + { "failingException", exception} + }); + }, + +``` +The IndexDecider didnt worked well when durable file was specified so an option to specify BufferIndexDecider is added. +Datatype of logEvent is string +i.e. +```csharp + BufferIndexDecider = (logEvent, offset) => "log-serilog-" + (new Random().Next(0, 2)), +``` +Option BufferFileCountLimit is added. The maximum number of log files that will be retained. including the current log file. For unlimited retention, pass null. The default is 31. +Option BufferFileSizeLimitBytes is added The maximum size, in bytes, to which the buffer log file for a specific date will be allowed to grow. By default 100L * 1024 * 1024 will be applied. +### Breaking changes for version 7 +Nuget Serilog.Sinks.File is now used instead of deprecated Serilog.Sinks.RollingFile +SingleEventSizePostingLimit option is changed from int to long? with default value null, Don't use value 0 nothing will be logged then!!!!! + ### Breaking changes for version 6 Starting from version 6, the sink has been upgraded to work with Elasticsearch 6.0 and has support for the new templates used by ES 6. diff --git a/sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs b/sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs index 6742883d..2681091e 100644 --- a/sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs +++ b/sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs @@ -1,36 +1,57 @@ using System; +using System.Collections.Generic; +using System.IO; +using System.Reflection.Metadata.Ecma335; +using System.Threading; +using Microsoft.Extensions.Configuration; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; using Serilog; +using Serilog.Core; using Serilog.Debugging; +using Serilog.Events; using Serilog.Formatting.Json; -using Serilog.Sinks.RollingFile; +using Serilog.Sinks.File; using Serilog.Sinks.SystemConsole.Themes; namespace Serilog.Sinks.Elasticsearch.Sample { class Program { + private static IConfiguration Configuration { get; } = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", true, true) + .AddEnvironmentVariables() + .Build(); static void Main(string[] args) { + + // Enable the selflog output + SelfLog.Enable(Console.Error); Log.Logger = new LoggerConfiguration() .MinimumLevel.Debug() .WriteTo.Console(theme: SystemConsoleTheme.Literate) - .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri("http://elastic:changeme@localhost:9200")) // for the docker-compose implementation - { - AutoRegisterTemplate = true, - //BufferBaseFilename = "./buffer", - RegisterTemplateFailure = RegisterTemplateRecovery.IndexAnyway, - FailureCallback = e => Console.WriteLine("Unable to submit event " + e.MessageTemplate), - EmitEventFailure = EmitEventFailureHandling.WriteToSelfLog | - EmitEventFailureHandling.WriteToFailureSink | - EmitEventFailureHandling.RaiseCallback, - FailureSink = new RollingFileSink("./fail-{Date}.txt", new JsonFormatter(), null, null) - }) - .CreateLogger(); + //not persistant + .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri(Configuration.GetConnectionString("elasticsearch"))) // for the docker-compose implementation + { + AutoRegisterTemplate = true, + //BufferBaseFilename = "./buffer", + RegisterTemplateFailure = RegisterTemplateRecovery.IndexAnyway, + FailureCallback = e => Console.WriteLine("Unable to submit event " + e.MessageTemplate), + EmitEventFailure = EmitEventFailureHandling.WriteToSelfLog | + EmitEventFailureHandling.WriteToFailureSink | + EmitEventFailureHandling.RaiseCallback, + FailureSink = new FileSink("./fail-{Date}.txt", new JsonFormatter(), null, null) + }) + .CreateLogger(); - // Enable the selflog output - SelfLog.Enable(Console.Error); + //SetupLoggerWithSimplePersistantStorage(); + //LoggingLevelSwitch levelSwitch = SetupLoggerWithPersistantStorage(); + //Log.Debug("To high loglevel default is Information this will not be logged"); + //levelSwitch.MinimumLevel = LogEventLevel.Debug; Log.Information("Hello, world!"); + //Log.Information("To big log row bigger than SingleEventSizePostingLimit ! {a}", new string('*', 5000)); int a = 10, b = 0; try @@ -47,7 +68,70 @@ static void Main(string[] args) Log.Debug("Reusing {A} by {B}", "string", true); Log.CloseAndFlush(); - Console.Read(); + Console.WriteLine("Press any key to continue..."); + while (!Console.KeyAvailable) + { + Thread.Sleep(500); + } + } + + private static LoggingLevelSwitch SetupLoggerWithPersistantStorage() + { + //persistant storage with all settings available for this mode + //please note that all limit settings here is set verry low for test, default values should usually work best! + var levelSwitch = new LoggingLevelSwitch(); + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Debug() + .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri(Configuration.GetConnectionString("elasticsearch"))) + { + AutoRegisterTemplate = true, + BufferBaseFilename = "./buffer/logserilog", + IndexFormat = "log-serilog-{0:yyyy.MM}", + RegisterTemplateFailure = RegisterTemplateRecovery.FailSink, + BufferCleanPayload = (failingEvent, statuscode, exception) => + { + dynamic e = JObject.Parse(failingEvent); + return JsonConvert.SerializeObject(new Dictionary() + { + { "@timestamp",e["@timestamp"]}, + { "level",e.level}, + { "message","Error: "+e.message}, + { "messageTemplate",e.messageTemplate}, + { "failingStatusCode", statuscode}, + { "failingException", exception} + }); + }, + OverwriteTemplate = true, + NumberOfShards = 1, + NumberOfReplicas = 1, + GetTemplateContent = null, + AutoRegisterTemplateVersion = AutoRegisterTemplateVersion.ESv6, + PipelineName = null, + TypeName = "logevent", + BufferIndexDecider = (logEvent, offset) => "log-serilog-" + (new Random().Next(0, 2)), + BatchPostingLimit = 50, + BufferLogShippingInterval = TimeSpan.FromSeconds(5), + SingleEventSizePostingLimit = 1000, + LevelSwitch = levelSwitch, + BufferRetainedInvalidPayloadsLimitBytes = 2000, + BufferFileSizeLimitBytes = 2000, + BufferFileCountLimit = 2 + }) + .CreateLogger(); + return levelSwitch; + } + + private static void SetupLoggerWithSimplePersistantStorage() + { + //presistant + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Debug() + .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri(Configuration.GetConnectionString("elasticsearch"))) + { + BufferBaseFilename = "./buffer/logserilogsimple", + IndexFormat = "log-serilog-simple-{0:yyyy.MM}" + }) + .CreateLogger(); } } } diff --git a/sample/Serilog.Sinks.Elasticsearch.Sample/Serilog.Sinks.Elasticsearch.Sample.csproj b/sample/Serilog.Sinks.Elasticsearch.Sample/Serilog.Sinks.Elasticsearch.Sample.csproj index eb530de1..970e4d9b 100644 --- a/sample/Serilog.Sinks.Elasticsearch.Sample/Serilog.Sinks.Elasticsearch.Sample.csproj +++ b/sample/Serilog.Sinks.Elasticsearch.Sample/Serilog.Sinks.Elasticsearch.Sample.csproj @@ -1,11 +1,15 @@ - + Exe - netcoreapp1.1 + netcoreapp2.1 + + + + @@ -14,4 +18,10 @@ + + + Always + + + diff --git a/sample/Serilog.Sinks.Elasticsearch.Sample/appsettings.json b/sample/Serilog.Sinks.Elasticsearch.Sample/appsettings.json new file mode 100644 index 00000000..f3586043 --- /dev/null +++ b/sample/Serilog.Sinks.Elasticsearch.Sample/appsettings.json @@ -0,0 +1,5 @@ +{ + "ConnectionStrings": { + "elasticsearch": "http://elastic:changeme@localhost:9200" + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs b/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs index 471d2d27..fb0b4250 100644 --- a/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs +++ b/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs @@ -24,6 +24,7 @@ using System.ComponentModel; using Elasticsearch.Net; using Serilog.Formatting; +using Serilog.Sinks.Elasticsearch.Durable; namespace Serilog { @@ -119,6 +120,7 @@ public static LoggerConfiguration Elasticsearch( /// A switch allowing the pass-through minimum level to be changed at runtime. /// /// + /// /// /// A comma or semi column separated list of key value pairs of headers to be added to each elastic http request /// The connection timeout (in seconds) when sending bulk operations to elasticsearch (defaults to 5). @@ -139,7 +141,7 @@ public static LoggerConfiguration Elasticsearch( /// Customizes the formatter used when converting log events into ElasticSearch documents. Please note that the formatter output must be valid JSON :) /// Customizes the formatter used when converting log events into the durable sink. Please note that the formatter output must be valid JSON :) /// Sink to use when Elasticsearch is unable to accept the events. This is optionally and depends on the EmitEventFailure setting. - /// The maximum length of an event allowed to be posted to Elasticsearch. + /// The maximum length of an event allowed to be posted to Elasticsearch.default null /// LoggerConfiguration object /// is . public static LoggerConfiguration Elasticsearch( @@ -153,7 +155,7 @@ public static LoggerConfiguration Elasticsearch( bool inlineFields = false, LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum, string bufferBaseFilename = null, - long? bufferFileSizeLimitBytes = null, + long? bufferFileSizeLimitBytes = null, long bufferLogShippingInterval = 5000, string connectionGlobalHeaders = null, LoggingLevelSwitch levelSwitch = null, @@ -175,7 +177,8 @@ public static LoggerConfiguration Elasticsearch( ITextFormatter customFormatter = null, ITextFormatter customDurableFormatter = null, ILogEventSink failureSink = null, - int singleEventSizePostingLimit = 0) + long? singleEventSizePostingLimit = null, + int? bufferFileCountLimit = null) { if (string.IsNullOrEmpty(nodeUris)) throw new ArgumentNullException(nameof(nodeUris), "No Elasticsearch node(s) specified."); @@ -220,6 +223,10 @@ public static LoggerConfiguration Elasticsearch( options.BufferFileSizeLimitBytes = bufferFileSizeLimitBytes.Value; } + if (bufferFileCountLimit.HasValue) + { + options.BufferFileCountLimit = bufferFileCountLimit.Value; + } options.BufferLogShippingInterval = TimeSpan.FromMilliseconds(bufferLogShippingInterval); if (!string.IsNullOrWhiteSpace(connectionGlobalHeaders)) diff --git a/src/Serilog.Sinks.Elasticsearch/Serilog.Sinks.Elasticsearch.csproj b/src/Serilog.Sinks.Elasticsearch/Serilog.Sinks.Elasticsearch.csproj index d3418bbb..2e01dd0f 100644 --- a/src/Serilog.Sinks.Elasticsearch/Serilog.Sinks.Elasticsearch.csproj +++ b/src/Serilog.Sinks.Elasticsearch/Serilog.Sinks.Elasticsearch.csproj @@ -28,12 +28,16 @@ false + + + + + - @@ -41,10 +45,26 @@ + + 1591;1701;1702 + $(DefineConstants);DURABLE;THREADING_TIMER + + + + 1591;1701;1702 + $(DefineConstants);DURABLE;THREADING_TIMER;HRESULTS + + + 1591;1701;1702 $(DefineConstants);DOTNETCORE;NO_SERIALIZATION;NO_TIMER + + 1591;1701;1702 + NU1605 + + diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/CrossPlatform/PortableTimer.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/CrossPlatform/PortableTimer.cs deleted file mode 100644 index 2a71c893..00000000 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/CrossPlatform/PortableTimer.cs +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2013-2016 Serilog Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#if NO_TIMER - -using Serilog.Debugging; -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Serilog.Sinks.Elasticsearch.CrossPlatform -{ - class PortableTimer : IDisposable - { - enum PortableTimerState - { - NotWaiting, - Waiting, - Active, - Disposed - } - - readonly object _stateLock = new object(); - PortableTimerState _state = PortableTimerState.NotWaiting; - - readonly Action _onTick; - readonly CancellationTokenSource _cancel = new CancellationTokenSource(); - - public PortableTimer(Action onTick) - { - if (onTick == null) throw new ArgumentNullException(nameof(onTick)); - _onTick = onTick; - } - - public async void Start(TimeSpan interval) - { - if (interval < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(interval)); - - lock (_stateLock) - { - if (_state == PortableTimerState.Disposed) - throw new ObjectDisposedException("PortableTimer"); - - // There's a little bit of raciness here, but it's needed to support the - // current API, which allows the tick handler to reenter and set the next interval. - - if (_state == PortableTimerState.Waiting) - throw new InvalidOperationException("The timer is already set."); - - if (_cancel.IsCancellationRequested) return; - - _state = PortableTimerState.Waiting; - } - - try - { - if (interval > TimeSpan.Zero) - await Task.Delay(interval, _cancel.Token).ConfigureAwait(false); - - _state = PortableTimerState.Active; - - if (!_cancel.Token.IsCancellationRequested) - { - _onTick(_cancel.Token); - } - } - catch (TaskCanceledException tcx) - { - SelfLog.WriteLine("The timer was canceled during invocation: {0}", tcx); - } - finally - { - lock (_stateLock) - _state = PortableTimerState.NotWaiting; - } - } - - public void Dispose() - { - _cancel.Cancel(); - - while (true) - { - lock (_stateLock) - { - if (_state == PortableTimerState.Disposed || - _state == PortableTimerState.NotWaiting) - { - _state = PortableTimerState.Disposed; - return; - } - } - -// On the very old platforms, we've got no option but to spin here. -#if THREAD - Thread.Sleep(10); -#endif - } - } - } -} -#endif diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/APayloadReader.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/APayloadReader.cs new file mode 100644 index 00000000..0a0af421 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/APayloadReader.cs @@ -0,0 +1,120 @@ +// Serilog.Sinks.Seq Copyright 2017 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#if DURABLE + +using System; +using System.IO; +using System.Text; +using Serilog.Debugging; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// Abstract payload reader + /// Generic version of https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/Durable/PayloadReader.cs + /// + /// + public abstract class APayloadReader : IPayloadReader + { + /// + /// + /// + /// + public abstract TPayload GetNoPayload(); + + /// + /// + /// + /// + /// + /// + /// + /// + /// + public TPayload ReadPayload(int batchPostingLimit, long? eventBodyLimitBytes, ref FileSetPosition position, ref int count,string fileName) + { + InitPayLoad(fileName); + + using (var current = System.IO.File.Open(position.File, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + var nextLineStart = position.NextLineStart; + while (count < batchPostingLimit && TryReadLine(current, ref nextLineStart, out var nextLine)) + { + position = new FileSetPosition(nextLineStart, position.File); + + // Count is the indicator that work was done, so advances even in the (rare) case an + // oversized event is dropped. + ++count; + + if (eventBodyLimitBytes.HasValue && Encoding.UTF8.GetByteCount(nextLine) > eventBodyLimitBytes.Value) + { + SelfLog.WriteLine( + "Event JSON representation exceeds the byte size limit of {0} and will be dropped; data: {1}", + eventBodyLimitBytes, nextLine); + } + else + { + AddToPayLoad(nextLine); + } + } + } + return FinishPayLoad(); + } + /// + /// + /// + /// + protected abstract void InitPayLoad(string fileName); + /// + /// + /// + /// + protected abstract TPayload FinishPayLoad(); + /// + /// + /// + /// + protected abstract void AddToPayLoad(string nextLine); + + // It would be ideal to chomp whitespace here, but not required. + private static bool TryReadLine(Stream current, ref long nextStart, out string nextLine) + { + var includesBom = nextStart == 0; + + if (current.Length <= nextStart) + { + nextLine = null; + return false; + } + + current.Position = nextStart; + + // Important not to dispose this StreamReader as the stream must remain open. + var reader = new StreamReader(current, Encoding.UTF8, false, 128); + nextLine = reader.ReadLine(); + + if (nextLine == null) + return false; + + nextStart += Encoding.UTF8.GetByteCount(nextLine) + Encoding.UTF8.GetByteCount(Environment.NewLine); + if (includesBom) + nextStart += 3; + + return true; + } + } +} + +#endif diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/BookmarkFile.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/BookmarkFile.cs new file mode 100644 index 00000000..9d35c192 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/BookmarkFile.cs @@ -0,0 +1,80 @@ +// Serilog.Sinks.Seq Copyright 2017 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#if DURABLE + +using System; +using System.IO; +using System.Text; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/Durable/BookmarkFile.cs + /// + sealed class BookmarkFile : IDisposable + { + readonly FileStream _bookmark; + + public BookmarkFile(string bookmarkFilename) + { + _bookmark = System.IO.File.Open(bookmarkFilename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + } + + public FileSetPosition TryReadBookmark() + { + if (_bookmark.Length != 0) + { + _bookmark.Position = 0; + + // Important not to dispose this StreamReader as the stream must remain open. + var reader = new StreamReader(_bookmark, Encoding.UTF8, false, 128); + var current = reader.ReadLine(); + + if (current != null) + { + var parts = current.Split(new[] { ":::" }, StringSplitOptions.RemoveEmptyEntries); + if (parts.Length == 2) + { + return new FileSetPosition(long.Parse(parts[0]), parts[1]); + } + } + } + + return FileSetPosition.None; + } + + public void WriteBookmark(FileSetPosition bookmark) + { + if (bookmark.File == null) + return; + + // Don't need to truncate, since we only ever read a single line and + // writes are always newline-terminated + _bookmark.Position = 0; + + // Cannot dispose, as `leaveOpen` is not available on all target platforms + var writer = new StreamWriter(_bookmark); + writer.WriteLine("{0}:::{1}", bookmark.NextLineStart, bookmark.File); + writer.Flush(); + } + + public void Dispose() + { + _bookmark.Dispose(); + } + } +} + +#endif diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ControlledLevelSwitch.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ControlledLevelSwitch.cs new file mode 100644 index 00000000..5e6321c3 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ControlledLevelSwitch.cs @@ -0,0 +1,73 @@ +// Serilog.Sinks.Seq Copyright 2016 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Serilog.Core; +using Serilog.Events; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// Instances of this type are single-threaded, generally only updated on a background + /// timer thread. An exception is , which may be called + /// concurrently but performs no synchronization. + /// https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/ControlledLevelSwitch.cs + /// + class ControlledLevelSwitch + { + // If non-null, then background level checks will be performed; set either through the constructor + // or in response to a level specification from the server. Never set to null after being made non-null. + LoggingLevelSwitch _controlledSwitch; + LogEventLevel? _originalLevel; + + public ControlledLevelSwitch(LoggingLevelSwitch controlledSwitch = null) + { + _controlledSwitch = controlledSwitch; + } + + public bool IsActive => _controlledSwitch != null; + + public bool IsIncluded(LogEvent evt) + { + // Concurrent, but not synchronized. + var controlledSwitch = _controlledSwitch; + return controlledSwitch == null || + (int)controlledSwitch.MinimumLevel <= (int)evt.Level; + } + + public void Update(LogEventLevel? minimumAcceptedLevel) + { + if (minimumAcceptedLevel == null) + { + if (_controlledSwitch != null && _originalLevel.HasValue) + _controlledSwitch.MinimumLevel = _originalLevel.Value; + + return; + } + + if (_controlledSwitch == null) + { + // The server is controlling the logging level, but not the overall logger. Hence, if the server + // stops controlling the level, the switch should become transparent. + _originalLevel = LevelAlias.Minimum; + _controlledSwitch = new LoggingLevelSwitch(minimumAcceptedLevel.Value); + return; + } + + if (!_originalLevel.HasValue) + _originalLevel = _controlledSwitch.MinimumLevel; + + _controlledSwitch.MinimumLevel = minimumAcceptedLevel.Value; + } + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticSearchSink.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticSearchSink.cs new file mode 100644 index 00000000..9ac18737 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticSearchSink.cs @@ -0,0 +1,98 @@ +// Copyright 2014 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using Elasticsearch.Net; +using Serilog.Core; +using Serilog.Events; + + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + class DurableElasticsearchSink : ILogEventSink, IDisposable + { + // we rely on the date in the filename later! + const string FileNameSuffix = "-.json"; + + readonly Logger _sink; + readonly LogShipper> _shipper; + readonly ElasticsearchSinkState _state; + + public DurableElasticsearchSink(ElasticsearchSinkOptions options) + { + _state = ElasticsearchSinkState.Create(options); + + if (string.IsNullOrWhiteSpace(options.BufferBaseFilename)) + { + throw new ArgumentException("Cannot create the durable ElasticSearch sink without a buffer base file name!"); + } + + + _sink = new LoggerConfiguration() + .MinimumLevel.Verbose() + .WriteTo.File(_state.DurableFormatter, + options.BufferBaseFilename + FileNameSuffix, + rollingInterval: RollingInterval.Day, + fileSizeLimitBytes: options.BufferFileSizeLimitBytes, + rollOnFileSizeLimit: true, + retainedFileCountLimit: options.BufferFileCountLimit, + levelSwitch: _state.Options.LevelSwitch, + encoding: Encoding.UTF8) + .CreateLogger(); + + + var elasticSearchLogClient = new ElasticsearchLogClient( + elasticLowLevelClient: _state.Client, + cleanPayload: _state.Options.BufferCleanPayload); + + var payloadReader = new ElasticsearchPayloadReader( + pipelineName: _state.Options.PipelineName, + typeName:_state.Options.TypeName, + serialize:_state.Serialize, + getIndexForEvent: _state.GetBufferedIndexForEvent + ); + + _shipper = new ElasticsearchLogShipper( + bufferBaseFilename: _state.Options.BufferBaseFilename, + batchPostingLimit: _state.Options.BatchPostingLimit, + period: _state.Options.BufferLogShippingInterval ?? TimeSpan.FromSeconds(5), + eventBodyLimitBytes: _state.Options.SingleEventSizePostingLimit, + levelControlSwitch: _state.Options.LevelSwitch, + logClient: elasticSearchLogClient, + payloadReader: payloadReader, + retainedInvalidPayloadsLimitBytes: _state.Options.BufferRetainedInvalidPayloadsLimitBytes, + bufferSizeLimitBytes: _state.Options.BufferFileSizeLimitBytes, + registerTemplateIfNeeded: _state.RegisterTemplateIfNeeded); + + } + + public void Emit(LogEvent logEvent) + { + // This is a lagging indicator, but the network bandwidth usage benefits + // are worth the ambiguity. + if (_shipper.IsIncluded(logEvent)) + { + _sink.Write(logEvent); + } + } + + public void Dispose() + { + _sink.Dispose(); + _shipper.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticsearchSink.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticsearchSink.cs new file mode 100644 index 00000000..9ac18737 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/DurableElasticsearchSink.cs @@ -0,0 +1,98 @@ +// Copyright 2014 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using Elasticsearch.Net; +using Serilog.Core; +using Serilog.Events; + + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + class DurableElasticsearchSink : ILogEventSink, IDisposable + { + // we rely on the date in the filename later! + const string FileNameSuffix = "-.json"; + + readonly Logger _sink; + readonly LogShipper> _shipper; + readonly ElasticsearchSinkState _state; + + public DurableElasticsearchSink(ElasticsearchSinkOptions options) + { + _state = ElasticsearchSinkState.Create(options); + + if (string.IsNullOrWhiteSpace(options.BufferBaseFilename)) + { + throw new ArgumentException("Cannot create the durable ElasticSearch sink without a buffer base file name!"); + } + + + _sink = new LoggerConfiguration() + .MinimumLevel.Verbose() + .WriteTo.File(_state.DurableFormatter, + options.BufferBaseFilename + FileNameSuffix, + rollingInterval: RollingInterval.Day, + fileSizeLimitBytes: options.BufferFileSizeLimitBytes, + rollOnFileSizeLimit: true, + retainedFileCountLimit: options.BufferFileCountLimit, + levelSwitch: _state.Options.LevelSwitch, + encoding: Encoding.UTF8) + .CreateLogger(); + + + var elasticSearchLogClient = new ElasticsearchLogClient( + elasticLowLevelClient: _state.Client, + cleanPayload: _state.Options.BufferCleanPayload); + + var payloadReader = new ElasticsearchPayloadReader( + pipelineName: _state.Options.PipelineName, + typeName:_state.Options.TypeName, + serialize:_state.Serialize, + getIndexForEvent: _state.GetBufferedIndexForEvent + ); + + _shipper = new ElasticsearchLogShipper( + bufferBaseFilename: _state.Options.BufferBaseFilename, + batchPostingLimit: _state.Options.BatchPostingLimit, + period: _state.Options.BufferLogShippingInterval ?? TimeSpan.FromSeconds(5), + eventBodyLimitBytes: _state.Options.SingleEventSizePostingLimit, + levelControlSwitch: _state.Options.LevelSwitch, + logClient: elasticSearchLogClient, + payloadReader: payloadReader, + retainedInvalidPayloadsLimitBytes: _state.Options.BufferRetainedInvalidPayloadsLimitBytes, + bufferSizeLimitBytes: _state.Options.BufferFileSizeLimitBytes, + registerTemplateIfNeeded: _state.RegisterTemplateIfNeeded); + + } + + public void Emit(LogEvent logEvent) + { + // This is a lagging indicator, but the network bandwidth usage benefits + // are worth the ambiguity. + if (_shipper.IsIncluded(logEvent)) + { + _sink.Write(logEvent); + } + } + + public void Dispose() + { + _sink.Dispose(); + _shipper.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogClient.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogClient.cs new file mode 100644 index 00000000..6b2d45bf --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogClient.cs @@ -0,0 +1,125 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.ExceptionServices; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Elasticsearch.Net; +using Serilog.Debugging; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// + /// + public class ElasticsearchLogClient : ILogClient> + { + private readonly IElasticLowLevelClient _elasticLowLevelClient; + private readonly Func _cleanPayload; + + /// + /// + /// + /// + /// + public ElasticsearchLogClient(IElasticLowLevelClient elasticLowLevelClient, + Func cleanPayload) + { + _elasticLowLevelClient = elasticLowLevelClient; + _cleanPayload = cleanPayload; + } + + public async Task SendPayloadAsync(List payload) + { + return await SendPayloadAsync(payload, true); + } + + public async Task SendPayloadAsync(List payload,bool first) + { + try + { + if (payload == null || !payload.Any()) return new SentPayloadResult(null, true); + var response = await _elasticLowLevelClient.BulkAsync(PostData.MultiJson(payload)); + + if (response.Success) + { + var cleanPayload = new List(); + var invalidPayload = GetInvalidPayloadAsync(response, payload,out cleanPayload); + if ((cleanPayload?.Any() ?? false) && first) + { + await SendPayloadAsync(cleanPayload,false); + } + + return new SentPayloadResult(response, true, invalidPayload); + } + else + { + SelfLog.WriteLine("Received failed ElasticSearch shipping result {0}: {1}", response.HttpStatusCode, + response.OriginalException); + return new SentPayloadResult(response, false, + new InvalidResult() + { + StatusCode = response.HttpStatusCode ?? 500, + Content = response.OriginalException.ToString() + }); + } + } + catch (Exception ex) + { + SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex); + return new SentPayloadResult(null, false, null, ex); + } + + + } + + private InvalidResult GetInvalidPayloadAsync(DynamicResponse baseResult, List payload, out List cleanPayload) + { + int i = 0; + cleanPayload = new List(); + var items = baseResult.Body["items"]; + if (items == null) return null; + List badPayload = new List(); + + bool hasErrors = false; + foreach (dynamic item in items) + { + long? status = item.index?.status; + i++; + if (!status.HasValue || status < 300) + { + continue; + } + + hasErrors = true; + var id = item.index?._id; + var error = item.index?.error; + if (int.TryParse(id.Split('_')[0], out int index)) + { + SelfLog.WriteLine("Received failed ElasticSearch shipping result {0}: {1}. Failed payload : {2}.", status, error?.ToString(), payload.ElementAt(index * 2 + 1)); + badPayload.Add(payload.ElementAt(index * 2)); + badPayload.Add(payload.ElementAt(index * 2 + 1)); + if (_cleanPayload != null) + { + cleanPayload.Add(payload.ElementAt(index * 2)); + cleanPayload.Add(_cleanPayload(payload.ElementAt(index * 2 + 1), status, error?.ToString())); + } + } + else + { + SelfLog.WriteLine($"Received failed ElasticSearch shipping result {status}: {error?.ToString()}."); + } + } + + if (!hasErrors) + return null; + return new InvalidResult() + { + StatusCode = baseResult.HttpStatusCode ?? 500, + Content = baseResult.ToString(), + BadPayLoad = String.Join(Environment.NewLine, badPayload) + }; + } + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogShipper.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogShipper.cs new file mode 100644 index 00000000..6a6b8b26 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchLogShipper.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Serilog.Core; +using Serilog.Debugging; +using Serilog.Sinks.Elasticsearch.Durable; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// + /// + public class ElasticsearchLogShipper : LogShipper> + { + private readonly Action _registerTemplateIfNeeded; + bool _didRegisterTemplateIfNeeded = false; + + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public ElasticsearchLogShipper(string bufferBaseFilename, int batchPostingLimit, TimeSpan period, + long? eventBodyLimitBytes, LoggingLevelSwitch levelControlSwitch, ILogClient> logClient, + IPayloadReader> payloadReader, long? retainedInvalidPayloadsLimitBytes, + long? bufferSizeLimitBytes, Action registerTemplateIfNeeded) + : base(bufferBaseFilename, batchPostingLimit, period, eventBodyLimitBytes, + levelControlSwitch, logClient, payloadReader, retainedInvalidPayloadsLimitBytes, bufferSizeLimitBytes) + { + _registerTemplateIfNeeded = registerTemplateIfNeeded; + } + + /// + /// + /// + /// + protected override async Task OnTick() + { + bool success = true; + try + { + if (!_didRegisterTemplateIfNeeded) + { + if (_registerTemplateIfNeeded != null) + { + _registerTemplateIfNeeded(); + _didRegisterTemplateIfNeeded = true; + } + } + } + catch (Exception ex) + { + SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex); + _connectionSchedule.MarkFailure(); + success = false; + } + if (success) + await base.OnTick(); + } + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchPayloadReader.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchPayloadReader.cs new file mode 100644 index 00000000..a31b253e --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticSearchPayloadReader.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// + /// + public class ElasticsearchPayloadReader: APayloadReader> + { + private readonly string _pipelineName; + private readonly string _typeName; + private readonly Func _serialize; + private readonly Func _getIndexForEvent; + private List _payload; + private int _count; + private DateTime _date; + + /// + /// + /// + /// + /// + /// + /// + public ElasticsearchPayloadReader(string pipelineName,string typeName, Func serialize,Func getIndexForEvent) + { + _pipelineName = pipelineName; + _typeName = typeName; + _serialize = serialize; + _getIndexForEvent = getIndexForEvent; + } + + /// + /// + /// + /// + public override List GetNoPayload() + { + return new List(); + } + + /// + /// + /// + /// + protected override void InitPayLoad(string filename) + { + _payload = new List(); + _count = 0; + var lastToken = filename.Split('-').Last(); + + // lastToken should be something like 20150218.json or 20150218_3.json now + if (!lastToken.ToLowerInvariant().EndsWith(".json")) + { + throw new FormatException(string.Format("The file name '{0}' does not seem to follow the right file pattern - it must be named [whatever]-{{Date}}[_n].json", Path.GetFileName(filename))); + } + + var dateString = lastToken.Substring(0, 8); + _date = DateTime.ParseExact(dateString, "yyyyMMdd", CultureInfo.InvariantCulture); + } + /// + /// + /// + /// + protected override List FinishPayLoad() + { + return _payload; + } + + /// + /// + /// + /// + protected override void AddToPayLoad(string nextLine) + { + var indexName = _getIndexForEvent(nextLine, _date); + var action = default(object); + + if (string.IsNullOrWhiteSpace(_pipelineName)) + { + action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid() } }; + } + else + { + action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid(), pipeline = _pipelineName } }; + } + + var actionJson = _serialize(action); + _payload.Add(actionJson); + _payload.Add(nextLine); + _count++; + } + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticsearchLogClient.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticsearchLogClient.cs new file mode 100644 index 00000000..6b2d45bf --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticsearchLogClient.cs @@ -0,0 +1,125 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.ExceptionServices; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Elasticsearch.Net; +using Serilog.Debugging; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// + /// + public class ElasticsearchLogClient : ILogClient> + { + private readonly IElasticLowLevelClient _elasticLowLevelClient; + private readonly Func _cleanPayload; + + /// + /// + /// + /// + /// + public ElasticsearchLogClient(IElasticLowLevelClient elasticLowLevelClient, + Func cleanPayload) + { + _elasticLowLevelClient = elasticLowLevelClient; + _cleanPayload = cleanPayload; + } + + public async Task SendPayloadAsync(List payload) + { + return await SendPayloadAsync(payload, true); + } + + public async Task SendPayloadAsync(List payload,bool first) + { + try + { + if (payload == null || !payload.Any()) return new SentPayloadResult(null, true); + var response = await _elasticLowLevelClient.BulkAsync(PostData.MultiJson(payload)); + + if (response.Success) + { + var cleanPayload = new List(); + var invalidPayload = GetInvalidPayloadAsync(response, payload,out cleanPayload); + if ((cleanPayload?.Any() ?? false) && first) + { + await SendPayloadAsync(cleanPayload,false); + } + + return new SentPayloadResult(response, true, invalidPayload); + } + else + { + SelfLog.WriteLine("Received failed ElasticSearch shipping result {0}: {1}", response.HttpStatusCode, + response.OriginalException); + return new SentPayloadResult(response, false, + new InvalidResult() + { + StatusCode = response.HttpStatusCode ?? 500, + Content = response.OriginalException.ToString() + }); + } + } + catch (Exception ex) + { + SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex); + return new SentPayloadResult(null, false, null, ex); + } + + + } + + private InvalidResult GetInvalidPayloadAsync(DynamicResponse baseResult, List payload, out List cleanPayload) + { + int i = 0; + cleanPayload = new List(); + var items = baseResult.Body["items"]; + if (items == null) return null; + List badPayload = new List(); + + bool hasErrors = false; + foreach (dynamic item in items) + { + long? status = item.index?.status; + i++; + if (!status.HasValue || status < 300) + { + continue; + } + + hasErrors = true; + var id = item.index?._id; + var error = item.index?.error; + if (int.TryParse(id.Split('_')[0], out int index)) + { + SelfLog.WriteLine("Received failed ElasticSearch shipping result {0}: {1}. Failed payload : {2}.", status, error?.ToString(), payload.ElementAt(index * 2 + 1)); + badPayload.Add(payload.ElementAt(index * 2)); + badPayload.Add(payload.ElementAt(index * 2 + 1)); + if (_cleanPayload != null) + { + cleanPayload.Add(payload.ElementAt(index * 2)); + cleanPayload.Add(_cleanPayload(payload.ElementAt(index * 2 + 1), status, error?.ToString())); + } + } + else + { + SelfLog.WriteLine($"Received failed ElasticSearch shipping result {status}: {error?.ToString()}."); + } + } + + if (!hasErrors) + return null; + return new InvalidResult() + { + StatusCode = baseResult.HttpStatusCode ?? 500, + Content = baseResult.ToString(), + BadPayLoad = String.Join(Environment.NewLine, badPayload) + }; + } + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticsearchLogShipper.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticsearchLogShipper.cs new file mode 100644 index 00000000..6a6b8b26 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticsearchLogShipper.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Serilog.Core; +using Serilog.Debugging; +using Serilog.Sinks.Elasticsearch.Durable; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// + /// + public class ElasticsearchLogShipper : LogShipper> + { + private readonly Action _registerTemplateIfNeeded; + bool _didRegisterTemplateIfNeeded = false; + + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public ElasticsearchLogShipper(string bufferBaseFilename, int batchPostingLimit, TimeSpan period, + long? eventBodyLimitBytes, LoggingLevelSwitch levelControlSwitch, ILogClient> logClient, + IPayloadReader> payloadReader, long? retainedInvalidPayloadsLimitBytes, + long? bufferSizeLimitBytes, Action registerTemplateIfNeeded) + : base(bufferBaseFilename, batchPostingLimit, period, eventBodyLimitBytes, + levelControlSwitch, logClient, payloadReader, retainedInvalidPayloadsLimitBytes, bufferSizeLimitBytes) + { + _registerTemplateIfNeeded = registerTemplateIfNeeded; + } + + /// + /// + /// + /// + protected override async Task OnTick() + { + bool success = true; + try + { + if (!_didRegisterTemplateIfNeeded) + { + if (_registerTemplateIfNeeded != null) + { + _registerTemplateIfNeeded(); + _didRegisterTemplateIfNeeded = true; + } + } + } + catch (Exception ex) + { + SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex); + _connectionSchedule.MarkFailure(); + success = false; + } + if (success) + await base.OnTick(); + } + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticsearchPayloadReader.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticsearchPayloadReader.cs new file mode 100644 index 00000000..a31b253e --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ElasticSearch/ElasticsearchPayloadReader.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// + /// + public class ElasticsearchPayloadReader: APayloadReader> + { + private readonly string _pipelineName; + private readonly string _typeName; + private readonly Func _serialize; + private readonly Func _getIndexForEvent; + private List _payload; + private int _count; + private DateTime _date; + + /// + /// + /// + /// + /// + /// + /// + public ElasticsearchPayloadReader(string pipelineName,string typeName, Func serialize,Func getIndexForEvent) + { + _pipelineName = pipelineName; + _typeName = typeName; + _serialize = serialize; + _getIndexForEvent = getIndexForEvent; + } + + /// + /// + /// + /// + public override List GetNoPayload() + { + return new List(); + } + + /// + /// + /// + /// + protected override void InitPayLoad(string filename) + { + _payload = new List(); + _count = 0; + var lastToken = filename.Split('-').Last(); + + // lastToken should be something like 20150218.json or 20150218_3.json now + if (!lastToken.ToLowerInvariant().EndsWith(".json")) + { + throw new FormatException(string.Format("The file name '{0}' does not seem to follow the right file pattern - it must be named [whatever]-{{Date}}[_n].json", Path.GetFileName(filename))); + } + + var dateString = lastToken.Substring(0, 8); + _date = DateTime.ParseExact(dateString, "yyyyMMdd", CultureInfo.InvariantCulture); + } + /// + /// + /// + /// + protected override List FinishPayLoad() + { + return _payload; + } + + /// + /// + /// + /// + protected override void AddToPayLoad(string nextLine) + { + var indexName = _getIndexForEvent(nextLine, _date); + var action = default(object); + + if (string.IsNullOrWhiteSpace(_pipelineName)) + { + action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid() } }; + } + else + { + action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid(), pipeline = _pipelineName } }; + } + + var actionJson = _serialize(action); + _payload.Add(actionJson); + _payload.Add(nextLine); + _count++; + } + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ExponentialBackoffConnectionSchedule.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ExponentialBackoffConnectionSchedule.cs similarity index 91% rename from src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ExponentialBackoffConnectionSchedule.cs rename to src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ExponentialBackoffConnectionSchedule.cs index 95b18d34..a0a7804f 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ExponentialBackoffConnectionSchedule.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ExponentialBackoffConnectionSchedule.cs @@ -14,12 +14,13 @@ using System; -namespace Serilog.Sinks.Elasticsearch +namespace Serilog.Sinks.Elasticsearch.Durable { /// /// Based on the BatchedConnectionStatus class from . + /// https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/ExponentialBackoffConnectionSchedule.cs /// - class ExponentialBackoffConnectionSchedule + public class ExponentialBackoffConnectionSchedule { static readonly TimeSpan MinimumBackoffPeriod = TimeSpan.FromSeconds(5); static readonly TimeSpan MaximumBackoffInterval = TimeSpan.FromMinutes(10); diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/FileSet.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/FileSet.cs new file mode 100644 index 00000000..133eb79e --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/FileSet.cs @@ -0,0 +1,129 @@ +// Serilog.Sinks.Seq Copyright 2017 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#if DURABLE + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Text.RegularExpressions; +using Serilog.Debugging; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/Durable/FileSet.cs + /// + class FileSet + { + readonly string _bookmarkFilename; + readonly string _candidateSearchPath; + readonly string _logFolder; + readonly Regex _filenameMatcher; + + const string InvalidPayloadFilePrefix = "invalid-"; + + public FileSet(string bufferBaseFilename) + { + if (bufferBaseFilename == null) throw new ArgumentNullException(nameof(bufferBaseFilename)); + + _bookmarkFilename = Path.GetFullPath(bufferBaseFilename + ".bookmark"); + _logFolder = Path.GetDirectoryName(_bookmarkFilename); + _candidateSearchPath = Path.GetFileName(bufferBaseFilename) + "-*.json"; + _filenameMatcher = new Regex("^" + Regex.Escape(Path.GetFileName(bufferBaseFilename)) + "-(?\\d{8})(?_[0-9]{3,}){0,1}\\.json$"); + } + + public BookmarkFile OpenBookmarkFile() + { + return new BookmarkFile(_bookmarkFilename); + } + + public string[] GetBufferFiles() + { + return Directory.GetFiles(_logFolder, _candidateSearchPath) + .Select(n => new KeyValuePair(n, _filenameMatcher.Match(Path.GetFileName(n)))) + .Where(nm => nm.Value.Success) + .OrderBy(nm => nm.Value.Groups["date"].Value, StringComparer.OrdinalIgnoreCase) + .ThenBy(nm => int.Parse("0" + nm.Value.Groups["sequence"].Value.Replace("_", ""))) + .Select(nm => nm.Key) + .ToArray(); + } + + public void CleanUpBufferFiles(long bufferSizeLimitBytes, int alwaysRetainCount) + { + try + { + var bufferFiles = GetBufferFiles(); + Array.Reverse(bufferFiles); + DeleteExceedingCumulativeSize(bufferFiles.Select(f => new FileInfo(f)), bufferSizeLimitBytes, 2); + } + catch (Exception ex) + { + SelfLog.WriteLine("Exception thrown while cleaning up buffer files: {0}", ex); + } + } + + public string MakeInvalidPayloadFilename(int statusCode) + { + var invalidPayloadFilename = $"{InvalidPayloadFilePrefix}{statusCode}-{Guid.NewGuid():n}.json"; + return Path.Combine(_logFolder, invalidPayloadFilename); + } + + public void CleanUpInvalidPayloadFiles(long maxNumberOfBytesToRetain) + { + try + { + var candidateFiles = from file in Directory.EnumerateFiles(_logFolder, $"{InvalidPayloadFilePrefix}*.json") + let candiateFileInfo = new FileInfo(file) + orderby candiateFileInfo.LastWriteTimeUtc descending + select candiateFileInfo; + + DeleteExceedingCumulativeSize(candidateFiles, maxNumberOfBytesToRetain, 0); + } + catch (Exception ex) + { + SelfLog.WriteLine("Exception thrown while cleaning up invalid payload files: {0}", ex); + } + } + + static void DeleteExceedingCumulativeSize(IEnumerable files, long maxNumberOfBytesToRetain, int alwaysRetainCount) + { + long cumulative = 0; + var i = 0; + foreach (var file in files) + { + cumulative += file.Length; + + if (i++ < alwaysRetainCount) + continue; + + if (cumulative <= maxNumberOfBytesToRetain) + continue; + + try + { + file.Delete(); + } + catch (Exception ex) + { + SelfLog.WriteLine("Exception thrown while trying to delete file {0}: {1}", file.FullName, ex); + } + } + } + } +} + +#endif diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/FileSetPosition.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/FileSetPosition.cs new file mode 100644 index 00000000..afe8970f --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/FileSetPosition.cs @@ -0,0 +1,38 @@ +// Serilog.Sinks.Seq Copyright 2017 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#if DURABLE + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/Durable/FileSetPosition.cs + /// + public struct FileSetPosition + { + public string File { get; } + + public long NextLineStart { get; } + + public FileSetPosition(long nextLineStart, string file) + { + NextLineStart = nextLineStart; + File = file; + } + + public static readonly FileSetPosition None = default(FileSetPosition); + } +} + +#endif diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ILogClient.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ILogClient.cs new file mode 100644 index 00000000..a7dd41e0 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/ILogClient.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// A wrapper client which talk to the log server + /// + /// + public interface ILogClient + { + Task SendPayloadAsync(TPayload payload); + } + public class SentPayloadResult + { + + public dynamic BaseResult { get; } + public bool Success { get; } + public InvalidResult InvalidResult { get; } + + + public Exception Exception { get; } + + public SentPayloadResult(dynamic baseResult, bool success, InvalidResult invalidResult =null, Exception exception=null) + { + BaseResult = baseResult; + Success = success; + InvalidResult = invalidResult; + Exception = exception; + } + + + } + + public class InvalidResult + { + public int StatusCode { get; set; } + public string Content { get; set; } + public string BadPayLoad { get; set; } + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/IPayloadReader.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/IPayloadReader.cs new file mode 100644 index 00000000..d759cd68 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/IPayloadReader.cs @@ -0,0 +1,12 @@ +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// Reads logs from logfiles and formats it for logserver + /// + /// + public interface IPayloadReader + { + TPayload ReadPayload(int batchPostingLimit, long? eventBodyLimitBytes, ref FileSetPosition position, ref int count,string fileName); + TPayload GetNoPayload(); + } +} \ No newline at end of file diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/LogShipper.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/LogShipper.cs new file mode 100644 index 00000000..a9a49666 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/LogShipper.cs @@ -0,0 +1,291 @@ +// Serilog.Sinks.Seq Copyright 2017 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#if DURABLE + +using System; +using System.IO; +using System.Linq; +using System.Text; +using Serilog.Core; +using Serilog.Debugging; +using Serilog.Events; +using IOFile = System.IO.File; +using System.Threading.Tasks; +#if HRESULTS +using System.Runtime.InteropServices; +#endif + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// Reads and sends logdata to log server + /// Generic version of https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/Durable/HttpLogShipper.cs + /// + /// + public class LogShipper : IDisposable + { + private static readonly TimeSpan RequiredLevelCheckInterval = TimeSpan.FromMinutes(2); + + readonly int _batchPostingLimit; + readonly long? _eventBodyLimitBytes; + private readonly ILogClient _logClient; + private readonly IPayloadReader _payloadReader; + readonly FileSet _fileSet; + readonly long? _retainedInvalidPayloadsLimitBytes; + readonly long? _bufferSizeLimitBytes; + + // Timer thread only + + /// + /// + /// + protected readonly ExponentialBackoffConnectionSchedule _connectionSchedule; + DateTime _nextRequiredLevelCheckUtc = DateTime.UtcNow.Add(RequiredLevelCheckInterval); + + // Synchronized + readonly object _stateLock = new object(); + + readonly PortableTimer _timer; + + // Concurrent + readonly ControlledLevelSwitch _controlledSwitch; + + volatile bool _unloading; + + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public LogShipper( + string bufferBaseFilename, + int batchPostingLimit, + TimeSpan period, + long? eventBodyLimitBytes, + LoggingLevelSwitch levelControlSwitch, + ILogClient logClient, + IPayloadReader payloadReader, + long? retainedInvalidPayloadsLimitBytes, + long? bufferSizeLimitBytes) + { + _batchPostingLimit = batchPostingLimit; + _eventBodyLimitBytes = eventBodyLimitBytes; + _logClient = logClient; + _payloadReader = payloadReader; + _controlledSwitch = new ControlledLevelSwitch(levelControlSwitch); + _connectionSchedule = new ExponentialBackoffConnectionSchedule(period); + _retainedInvalidPayloadsLimitBytes = retainedInvalidPayloadsLimitBytes; + _bufferSizeLimitBytes = bufferSizeLimitBytes; + _fileSet = new FileSet(bufferBaseFilename); + _timer = new PortableTimer(c => OnTick()); + SetTimer(); + + } + + void CloseAndFlush() + { + lock (_stateLock) + { + if (_unloading) + return; + + _unloading = true; + } + + _timer.Dispose(); + + OnTick().GetAwaiter().GetResult(); + } + + /// + /// + /// + /// + /// + public bool IsIncluded(LogEvent logEvent) + { + return _controlledSwitch.IsIncluded(logEvent); + } + + /// + public void Dispose() + { + CloseAndFlush(); + } + + protected void SetTimer() + { + // Note, called under _stateLock + _timer.Start(_connectionSchedule.NextInterval); + } + + /// + /// + /// + /// + protected virtual async Task OnTick() + { + try + { + int count; + do + { + count = 0; + + using (var bookmarkFile = _fileSet.OpenBookmarkFile()) + { + var position = bookmarkFile.TryReadBookmark(); + var files = _fileSet.GetBufferFiles(); + + if (position.File == null || !IOFile.Exists(position.File)) + { + position = new FileSetPosition(0, files.FirstOrDefault()); + } + + TPayload payload; + if (position.File == null) + { + payload = _payloadReader.GetNoPayload(); + count = 0; + } + else + { + payload = _payloadReader.ReadPayload(_batchPostingLimit, _eventBodyLimitBytes, ref position, ref count,position.File); + } + + if (count > 0 || _controlledSwitch.IsActive && _nextRequiredLevelCheckUtc < DateTime.UtcNow) + { + _nextRequiredLevelCheckUtc = DateTime.UtcNow.Add(RequiredLevelCheckInterval); + + var result = await _logClient.SendPayloadAsync(payload).ConfigureAwait(false); + if (result.Success) + { + _connectionSchedule.MarkSuccess(); + if(result.InvalidResult!=null) + DumpInvalidPayload(result.InvalidResult.StatusCode, result.InvalidResult.Content, result.InvalidResult.BadPayLoad); + bookmarkFile.WriteBookmark(position); + } + else + { + _connectionSchedule.MarkFailure(); + if (_bufferSizeLimitBytes.HasValue) + _fileSet.CleanUpBufferFiles(_bufferSizeLimitBytes.Value, 2); + + break; + } + } + else if (position.File == null) + { + break; + } + else + { + // For whatever reason, there's nothing waiting to send. This means we should try connecting again at the + // regular interval, so mark the attempt as successful. + _connectionSchedule.MarkSuccess(); + + // Only advance the bookmark if no other process has the + // current file locked, and its length is as we found it. + if (files.Length == 2 && files.First() == position.File && + FileIsUnlockedAndUnextended(position)) + { + bookmarkFile.WriteBookmark(new FileSetPosition(0, files[1])); + } + + if (files.Length > 2) + { + // By this point, we expect writers to have relinquished locks + // on the oldest file. + IOFile.Delete(files[0]); + } + } + } + } while (count == _batchPostingLimit); + } + catch (Exception ex) + { + _connectionSchedule.MarkFailure(); + SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex); + + if (_bufferSizeLimitBytes.HasValue) + _fileSet.CleanUpBufferFiles(_bufferSizeLimitBytes.Value, 2); + } + finally + { + lock (_stateLock) + { + if (!_unloading) + SetTimer(); + } + } + } + + + + void DumpInvalidPayload(int statusCode,string resultContent, string payload) + { + var invalidPayloadFile = _fileSet.MakeInvalidPayloadFilename(statusCode); + SelfLog.WriteLine("HTTP shipping failed with {0}: {1}; dumping payload to {2}", statusCode, + resultContent, invalidPayloadFile); + var bytesToWrite = Encoding.UTF8.GetBytes(payload); + if (_retainedInvalidPayloadsLimitBytes.HasValue) + { + _fileSet.CleanUpInvalidPayloadFiles(_retainedInvalidPayloadsLimitBytes.Value - bytesToWrite.Length); + } + IOFile.WriteAllBytes(invalidPayloadFile, bytesToWrite); + } + + static bool FileIsUnlockedAndUnextended(FileSetPosition position) + { + try + { + using (var fileStream = IOFile.Open(position.File, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read)) + { + return fileStream.Length <= position.NextLineStart; + } + } +#if HRESULTS + catch (IOException ex) + { + var errorCode = Marshal.GetHRForException(ex) & ((1 << 16) - 1); + if (errorCode != 32 && errorCode != 33) + { + SelfLog.WriteLine("Unexpected I/O exception while testing locked status of {0}: {1}", position.File, ex); + } + } +#else + catch (IOException) + { + // Where no HRESULT is available, assume IOExceptions indicate a locked file + } +#endif + catch (Exception ex) + { + SelfLog.WriteLine("Unexpected exception while testing locked status of {0}: {1}", position.File, ex); + } + + return false; + } + } +} + +#endif diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/PortableTimer.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/PortableTimer.cs new file mode 100644 index 00000000..9d823049 --- /dev/null +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/PortableTimer.cs @@ -0,0 +1,142 @@ +// Copyright 2013-2016 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Serilog.Debugging; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Serilog.Sinks.Elasticsearch.Durable +{ + /// + /// https://github.com/serilog/serilog-sinks-seq/blob/v4.0.0/src/Serilog.Sinks.Seq/Sinks/Seq/PortableTimer.cs + /// + class PortableTimer : IDisposable + { + readonly object _stateLock = new object(); + + readonly Func _onTick; + readonly CancellationTokenSource _cancel = new CancellationTokenSource(); + +#if THREADING_TIMER + readonly Timer _timer; +#endif + + bool _running; + bool _disposed; + + public PortableTimer(Func onTick) + { + if (onTick == null) throw new ArgumentNullException(nameof(onTick)); + + _onTick = onTick; + +#if THREADING_TIMER + _timer = new Timer(_ => OnTick(), null, Timeout.Infinite, Timeout.Infinite); +#endif + } + + public void Start(TimeSpan interval) + { + if (interval < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(interval)); + + lock (_stateLock) + { + if (_disposed) + throw new ObjectDisposedException(nameof(PortableTimer)); + +#if THREADING_TIMER + _timer.Change(interval, Timeout.InfiniteTimeSpan); +#else + Task.Delay(interval, _cancel.Token) + .ContinueWith( + _ => OnTick(), + CancellationToken.None, + TaskContinuationOptions.DenyChildAttach, + TaskScheduler.Default); +#endif + } + } + + async void OnTick() + { + try + { + lock (_stateLock) + { + if (_disposed) + { + return; + } + + // There's a little bit of raciness here, but it's needed to support the + // current API, which allows the tick handler to reenter and set the next interval. + + if (_running) + { + Monitor.Wait(_stateLock); + + if (_disposed) + { + return; + } + } + + _running = true; + } + + if (!_cancel.Token.IsCancellationRequested) + { + await _onTick(_cancel.Token); + } + } + catch (OperationCanceledException tcx) + { + SelfLog.WriteLine("The timer was canceled during invocation: {0}", tcx); + } + finally + { + lock (_stateLock) + { + _running = false; + Monitor.PulseAll(_stateLock); + } + } + } + + public void Dispose() + { + _cancel.Cancel(); + + lock (_stateLock) + { + if (_disposed) + { + return; + } + + while (_running) + { + Monitor.Wait(_stateLock); + } + +#if THREADING_TIMER + _timer.Dispose(); +#endif + + _disposed = true; + } + } + } +} diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/DurableElasticSearchSink.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/DurableElasticSearchSink.cs deleted file mode 100644 index 20b3fa5f..00000000 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/DurableElasticSearchSink.cs +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2014 Serilog Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using Serilog.Core; -using Serilog.Events; -using Serilog.Sinks.RollingFile; - -namespace Serilog.Sinks.Elasticsearch -{ - class DurableElasticsearchSink : ILogEventSink, IDisposable - { - // we rely on the date in the filename later! - const string FileNameSuffix = "-{Date}.json"; - - readonly RollingFileSink _sink; - readonly ElasticsearchLogShipper _shipper; - readonly ElasticsearchSinkState _state; - - public DurableElasticsearchSink(ElasticsearchSinkOptions options) - { - _state = ElasticsearchSinkState.Create(options); - - if (string.IsNullOrWhiteSpace(options.BufferBaseFilename)) - { - throw new ArgumentException("Cannot create the durable ElasticSearch sink without a buffer base file name!"); - } - - _sink = new RollingFileSink( - options.BufferBaseFilename + FileNameSuffix, - _state.DurableFormatter, - options.BufferFileSizeLimitBytes, - null); - - _shipper = new ElasticsearchLogShipper(_state); - } - - public void Emit(LogEvent logEvent) - { - _sink.Emit(logEvent); - } - - public void Dispose() - { - _sink.Dispose(); - _shipper.Dispose(); - } - } -} \ No newline at end of file diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchLogShipper.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchLogShipper.cs deleted file mode 100644 index 22b0a768..00000000 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchLogShipper.cs +++ /dev/null @@ -1,441 +0,0 @@ -// Copyright 2014 Serilog Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using System.Collections.Generic; -using System.Globalization; -using System.IO; -using System.Linq; -using System.Runtime.InteropServices; -using System.Text; -using System.Threading; -using Serilog.Debugging; -using Elasticsearch.Net; - -#if NO_TIMER -using Serilog.Sinks.Elasticsearch.CrossPlatform; -#endif - -namespace Serilog.Sinks.Elasticsearch -{ - internal class ElasticsearchLogShipper : IDisposable - { - private readonly ElasticsearchSinkState _state; - - readonly int _batchPostingLimit; - readonly int _singleEventSizePostingLimit; -#if NO_TIMER - readonly PortableTimer _timer; -#else - readonly Timer _timer; -#endif - - readonly ExponentialBackoffConnectionSchedule _connectionSchedule; - readonly object _stateLock = new object(); - volatile bool _unloading; - readonly string _bookmarkFilename; - readonly string _logFolder; - readonly string _candidateSearchPath; - - bool _didRegisterTemplateIfNeeded; - - internal ElasticsearchLogShipper(ElasticsearchSinkOptions option) - { - _batchPostingLimit = option.BatchPostingLimit; - _singleEventSizePostingLimit = option.SingleEventSizePostingLimit; - _state = ElasticsearchSinkState.Create(option); - } - - internal ElasticsearchLogShipper(ElasticsearchSinkState state) - { - - _state = state; - _connectionSchedule = new ExponentialBackoffConnectionSchedule(_state.Options.BufferLogShippingInterval ?? TimeSpan.FromSeconds(5)); - - _batchPostingLimit = _state.Options.BatchPostingLimit; - _singleEventSizePostingLimit = _state.Options.SingleEventSizePostingLimit; - _bookmarkFilename = Path.GetFullPath(_state.Options.BufferBaseFilename + ".bookmark"); - _logFolder = Path.GetDirectoryName(_bookmarkFilename); - _candidateSearchPath = Path.GetFileName(_state.Options.BufferBaseFilename) + "*.json"; - -#if NO_TIMER - _timer = new PortableTimer(cancel => OnTick()); -#else - _timer = new Timer(s => OnTick(), null, -1, -1); -#endif - SetTimer(); - } - - void CloseAndFlush() - { - lock (_stateLock) - { - if (_unloading) - return; - - _unloading = true; - } - - -#if NO_TIMER - _timer.Dispose(); -#else - - var wh = new ManualResetEvent(false); - if (_timer.Dispose(wh)) - wh.WaitOne(); -#endif - - OnTick(); - } - - /// - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// - /// 2 - public void Dispose() - { - Dispose(true); - } - - /// - /// Free resources held by the sink. - /// - /// If true, called because the object is being disposed; if false, - /// the object is being disposed from the finalizer. - protected virtual void Dispose(bool disposing) - { - if (!disposing) return; - CloseAndFlush(); - } - - void SetTimer() - { - // Note, called under _stateLock - var infiniteTimespan = Timeout.InfiniteTimeSpan; -#if NO_TIMER - _timer.Start(_connectionSchedule.NextInterval); -#else - _timer.Change(_connectionSchedule.NextInterval, infiniteTimespan); -#endif - } - - void OnTick() - { - try - { - // on the very first timer tick, we make the auto-register-if-necessary call - if (!_didRegisterTemplateIfNeeded) - { - _state.RegisterTemplateIfNeeded(); - _didRegisterTemplateIfNeeded = true; - } - - var hasData = false; - - do - { - // Locking the bookmark ensures that though there may be multiple instances of this - // class running, only one will ship logs at a time. - - using (var bookmark = System.IO.File.Open(_bookmarkFilename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read)) - { - TryReadBookmark(bookmark, out long currentLineBeginsAtOffset, out string currentFilePath); - var fileSet = GetFileSet(); - - if (currentFilePath == null || !System.IO.File.Exists(currentFilePath)) - { - currentLineBeginsAtOffset = 0; - currentFilePath = fileSet.FirstOrDefault(); - } - - if (currentFilePath == null) continue; - - hasData = false; - // file name pattern: whatever-bla-bla-20150218.json, whatever-bla-bla-20150218_1.json, etc. - var lastToken = currentFilePath.Split('-').Last(); - - // lastToken should be something like 20150218.json or 20150218_3.json now - if (!lastToken.ToLowerInvariant().EndsWith(".json")) - { - throw new FormatException(string.Format("The file name '{0}' does not seem to follow the right file pattern - it must be named [whatever]-{{Date}}[_n].json", Path.GetFileName(currentFilePath))); - } - - var dateString = lastToken.Substring(0, 8); - var date = DateTime.ParseExact(dateString, "yyyyMMdd", CultureInfo.InvariantCulture); - var indexName = _state.GetIndexForEvent(null, date); - - var payload = new List(); - var nextLineBeginsAtOffset = currentLineBeginsAtOffset; - - using (var current = System.IO.File.Open(currentFilePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) - { - nextLineBeginsAtOffset = CreatePayLoad(current, payload, indexName, currentLineBeginsAtOffset, currentFilePath); - } - - if (nextLineBeginsAtOffset > currentLineBeginsAtOffset) - { - hasData = true; - - if (!payload.Any()) - { - // Nothing to send, all events have been skipped, just write next offset to the bookmark - WriteBookmark(bookmark, nextLineBeginsAtOffset, currentFilePath); - continue; - } - - var response = _state.Client.Bulk(PostData.MultiJson(payload)); - - if (response.Success) - { - WriteBookmark(bookmark, nextLineBeginsAtOffset, currentFilePath); - _connectionSchedule.MarkSuccess(); - - HandleBulkResponse(response,payload); - } - else - { - _connectionSchedule.MarkFailure(); - SelfLog.WriteLine("Received failed ElasticSearch shipping result {0}: {1}", response.HttpStatusCode, response.OriginalException); - break; - } - } - else - { - // For whatever reason, there's nothing waiting to send. This means we should try connecting again at the - // regular interval, so mark the attempt as successful. - _connectionSchedule.MarkSuccess(); - - // Only advance the bookmark if no other process has the - // current file locked, and its length is as we found it. - - if (fileSet.Length == 2 && fileSet.First() == currentFilePath && IsUnlockedAtLength(currentFilePath, nextLineBeginsAtOffset)) - { - WriteBookmark(bookmark, 0, fileSet[1]); - } - - if (fileSet.Length > 2) - { - // Once there's a third file waiting to ship, we do our - // best to move on, though a lock on the current file - // will delay this. - - System.IO.File.Delete(fileSet[0]); - } - } - } - } - while (hasData); - } - catch (Exception ex) - { - SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex); - _connectionSchedule.MarkFailure(); - } - finally - { - lock (_stateLock) - { - if (!_unloading) - { - SetTimer(); - } - } - } - } - - private long CreatePayLoad( - Stream current, - List payload, - string indexName, - long currentLineBeginsAtOffset, - string currentFilePath) - { - current.Position = currentLineBeginsAtOffset; - - var count = 0; - string nextLine; - var currentPosition = current.Position; - var nextPosition = currentPosition; - while (count < _batchPostingLimit && TryReadLine(current, ref nextPosition, out nextLine)) - { - if (_singleEventSizePostingLimit > 0) - { - if (nextLine.Length > _singleEventSizePostingLimit) - { - SelfLog.WriteLine( - "File: {0}. Skip sending to ElasticSearch event at position offset {1}. Reason: {2}.", - currentFilePath, - currentPosition, - $"Event has line length {nextLine.Length} over limit of {_singleEventSizePostingLimit}"); - - currentPosition = nextPosition; - continue; - } - - currentPosition = nextPosition; - } - - var action = default(object); - - if (string.IsNullOrWhiteSpace(_state.Options.PipelineName)) - { - action = new { index = new { _index = indexName, _type = _state.Options.TypeName, _id = count + "_" + Guid.NewGuid() } }; - } - else - { - action = new { index = new { _index = indexName, _type = _state.Options.TypeName, _id = count + "_" + Guid.NewGuid(), pipeline = _state.Options.PipelineName } }; - } - - var actionJson = _state.Serialize(action); - payload.Add(actionJson); - payload.Add(nextLine); - ++count; - } - - return nextPosition; - } - - static void HandleBulkResponse(DynamicResponse response, List payload) - { - int i = 0; - var items = response.Body["items"]; - if (items == null) return; - foreach (dynamic item in items) - { - long? status = item.index?.status; - i++; - if (!status.HasValue || status < 300) - { - continue; - } - - var id = item.index?._id; - var error = item.index?.error; - if (int.TryParse(id.Split('_')[0], out int index)) - { - SelfLog.WriteLine("Received failed ElasticSearch shipping result {0}: {1}. Failed payload : {2}.", - status, error.ToString(), - payload.ElementAt(index * 2 + 1)); - } - else - SelfLog.WriteLine("Received failed ElasticSearch shipping result {0}: {1}.", - status, error.ToString()); - } - } - - static bool IsUnlockedAtLength(string file, long maxLen) - { - try - { - using (var fileStream = System.IO.File.Open(file, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read)) - { - return fileStream.Length <= maxLen; - } - } - catch (IOException ex) - { - var errorCode = Marshal.GetHRForException(ex) & ((1 << 16) - 1); - if (errorCode != 32 && errorCode != 33) - { - SelfLog.WriteLine("Unexpected I/O exception while testing locked status of {0}: {1}", file, ex); - } - } - catch (Exception ex) - { - SelfLog.WriteLine("Unexpected exception while testing locked status of {0}: {1}", file, ex); - } - - return false; - } - - static void WriteBookmark(FileStream bookmark, long nextLineBeginsAtOffset, string currentFile) - { - using (var writer = new StreamWriter(bookmark)) - { - writer.WriteLine("{0}:::{1}", nextLineBeginsAtOffset, currentFile); - } - } - - // It would be ideal to chomp whitespace here, but not required. - internal static bool TryReadLine(Stream current, ref long nextStart, out string nextLine) - { - bool includesBom = false; - if (nextStart == 0 && current.Length >= 3) - { - var preamble = Encoding.UTF8.GetPreamble(); - byte[] readBytes = new byte[3]; - current.Position = 0; - current.Read(readBytes, 0, 3); - - includesBom = !preamble.Where((p, i) => p != readBytes[i]).Any(); - } - - if (current.Length <= nextStart) - { - nextLine = null; - return false; - } - - current.Position = nextStart; - - // Important not to dispose this StreamReader as the stream must remain open (and we can't use the overload with 'leaveOpen' as it's not available in .NET4 - var reader = new StreamReader(current, Encoding.UTF8, false, 128); - nextLine = reader.ReadLine(); - - if (nextLine == null) - return false; - - nextStart += Encoding.UTF8.GetByteCount(nextLine) + Encoding.UTF8.GetByteCount(Environment.NewLine); - if (includesBom) - nextStart += 3; - - return true; - } - - static void TryReadBookmark(Stream bookmark, out long nextLineBeginsAtOffset, out string currentFile) - { - nextLineBeginsAtOffset = 0; - currentFile = null; - - if (bookmark.Length != 0) - { - string current; - - // Important not to dispose this StreamReader as the stream must remain open (and we can't use the overload with 'leaveOpen' as it's not available in .NET4 - var reader = new StreamReader(bookmark, Encoding.UTF8, false, 128); - current = reader.ReadLine(); - - if (current != null) - { - bookmark.Position = 0; - var parts = current.Split(new[] { ":::" }, StringSplitOptions.RemoveEmptyEntries); - if (parts.Length == 2) - { - nextLineBeginsAtOffset = long.Parse(parts[0]); - currentFile = parts[1]; - } - } - - } - } - - string[] GetFileSet() - { - return Directory.GetFiles(_logFolder, _candidateSearchPath) - .OrderBy(n => n) - .ToArray(); - } - } -} \ No newline at end of file diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkOptions.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkOptions.cs index 6bce6e25..4724d146 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkOptions.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkOptions.cs @@ -114,9 +114,9 @@ public class ElasticsearchSinkOptions public int BatchPostingLimit { get; set; } /// - /// The maximum length of a an event record to be sent. Defaults to: 0 (No Limit) + /// The maximum length of a an event record to be sent. Defaults to: null (No Limit) only used in file buffer mode /// - public int SingleEventSizePostingLimit { get; set; } + public long? SingleEventSizePostingLimit { get; set; } /// /// The time to wait between checking for event batches. Defaults to 2 seconds. @@ -164,17 +164,22 @@ public class ElasticsearchSinkOptions public IConnectionPool ConnectionPool { get; private set; } /// - /// Function to decide which index to write the LogEvent to + /// Function to decide which index to write the LogEvent to, when using file see: BufferIndexDecider /// public Func IndexDecider { get; set; } + /// + /// Function to decide which index to write the LogEvent to when using file buffer + /// Arguments is: logRow, DateTime of logfile + /// + public Func BufferIndexDecider { get; set; } /// /// Optional path to directory that can be used as a log shipping buffer for increasing the reliability of the log forwarding. /// public string BufferBaseFilename { get; set; } /// - /// The maximum size, in bytes, to which the buffer log file for a specific date will be allowed to grow. By default no limit will be applied. + /// The maximum size, in bytes, to which the buffer log file for a specific date will be allowed to grow. By default 100L * 1024 * 1024 will be applied. /// public long? BufferFileSizeLimitBytes { get; set; } @@ -183,6 +188,18 @@ public class ElasticsearchSinkOptions /// public TimeSpan? BufferLogShippingInterval { get; set; } + /// + /// An action to do when log row was denied by the elasticsearch because of the data (payload). + /// The arguments is: The log row, status code from server, error message + /// + public Func BufferCleanPayload { get; set; } + + /// + /// A soft limit for the number of bytes to use for storing failed requests. + /// The limit is soft in that it can be exceeded by any single error payload, but in that case only that single error + /// payload will be retained. + /// + public long? BufferRetainedInvalidPayloadsLimitBytes { get; set; } /// /// Customizes the formatter used when converting log events into ElasticSearch documents. Please note that the formatter output must be valid JSON :) /// @@ -224,6 +241,11 @@ public int QueueSizeLimit _queueSizeLimit = value; } } + /// + /// The maximum number of log files that will be retained, + /// including the current log file. For unlimited retention, pass null. The default is 31. + /// + public int? BufferFileCountLimit { get; set; } /// /// Configures the elasticsearch sink defaults @@ -235,12 +257,14 @@ public ElasticsearchSinkOptions() this.TypeName = "logevent"; this.Period = TimeSpan.FromSeconds(2); this.BatchPostingLimit = 50; - this.SingleEventSizePostingLimit = 0; + this.SingleEventSizePostingLimit = null; this.TemplateName = "serilog-events-template"; this.ConnectionTimeout = TimeSpan.FromSeconds(5); this.EmitEventFailure = EmitEventFailureHandling.WriteToSelfLog; this.RegisterTemplateFailure = RegisterTemplateRecovery.IndexAnyway; this.QueueSizeLimit = 100000; + this.BufferFileCountLimit = 31; + this.BufferFileSizeLimitBytes = 100L * 1024 * 1024; } /// diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs index 2b277186..6f6cc2d3 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs @@ -36,6 +36,7 @@ public static ElasticsearchSinkState Create(ElasticsearchSinkOptions options) private readonly ElasticsearchSinkOptions _options; readonly Func _indexDecider; + readonly Func _bufferedIndexDecider; private readonly ITextFormatter _formatter; private readonly ITextFormatter _durableFormatter; @@ -64,6 +65,7 @@ private ElasticsearchSinkState(ElasticsearchSinkOptions options) _templateMatchString = IndexFormatRegex.Replace(options.IndexFormat, @"$1*$2"); _indexDecider = options.IndexDecider ?? ((@event, offset) => string.Format(options.IndexFormat, offset)); + _bufferedIndexDecider = options.BufferIndexDecider ?? ((@event, offset) => string.Format(options.IndexFormat, offset)); _options = options; @@ -117,6 +119,13 @@ public string GetIndexForEvent(LogEvent e, DateTimeOffset offset) return _indexDecider(e, offset); } + public string GetBufferedIndexForEvent(string logEvent, DateTime offset) + { + if (!TemplateRegistrationSuccess && _options.RegisterTemplateFailure == RegisterTemplateRecovery.IndexToDeadletterIndex) + return string.Format(_options.DeadLetterIndexName, offset); + return _bufferedIndexDecider(logEvent, offset); + } + /// /// Register the elasticsearch index template if the provided options mandate it. /// diff --git a/test/Serilog.Sinks.Elasticsearch.Tests/Serilog.Sinks.Elasticsearch.Tests.csproj b/test/Serilog.Sinks.Elasticsearch.Tests/Serilog.Sinks.Elasticsearch.Tests.csproj index 41920be0..1d2f6290 100644 --- a/test/Serilog.Sinks.Elasticsearch.Tests/Serilog.Sinks.Elasticsearch.Tests.csproj +++ b/test/Serilog.Sinks.Elasticsearch.Tests/Serilog.Sinks.Elasticsearch.Tests.csproj @@ -33,10 +33,6 @@ - - - - @@ -70,6 +66,10 @@ + + + +