|
@@ -85,6 +85,7 @@ namespace Vit.Core.Module.Log.LogCollector.ElasticSearch.Client
|
|
{
|
|
{
|
|
if (intervalMs.HasValue && intervalMs.Value > 0)
|
|
if (intervalMs.HasValue && intervalMs.Value > 0)
|
|
{
|
|
{
|
|
|
|
+ StringBuilder buffer = new StringBuilder();
|
|
recordList = new ConcurrentBag<ElasticSearchRecord>();
|
|
recordList = new ConcurrentBag<ElasticSearchRecord>();
|
|
recordList_Swap = new ConcurrentBag<ElasticSearchRecord>();
|
|
recordList_Swap = new ConcurrentBag<ElasticSearchRecord>();
|
|
time = new Util.Threading.Timer.SersTimer_SingleThread();
|
|
time = new Util.Threading.Timer.SersTimer_SingleThread();
|
|
@@ -94,7 +95,8 @@ namespace Vit.Core.Module.Log.LogCollector.ElasticSearch.Client
|
|
(recordList_Swap, recordList) = (recordList, recordList_Swap);
|
|
(recordList_Swap, recordList) = (recordList, recordList_Swap);
|
|
if (recordList_Swap.Count > 0)
|
|
if (recordList_Swap.Count > 0)
|
|
{
|
|
{
|
|
- SendToServer(recordList_Swap);
|
|
|
|
|
|
+ lock(buffer)
|
|
|
|
+ SendToServer(recordList_Swap, buffer);
|
|
while (recordList_Swap.TryTake(out _)) ;
|
|
while (recordList_Swap.TryTake(out _)) ;
|
|
}
|
|
}
|
|
};
|
|
};
|
|
@@ -106,28 +108,27 @@ namespace Vit.Core.Module.Log.LogCollector.ElasticSearch.Client
|
|
|
|
|
|
|
|
|
|
private System.Net.Http.HttpClient httpClient = null;
|
|
private System.Net.Http.HttpClient httpClient = null;
|
|
- private StringBuilder buffer = new StringBuilder();
|
|
|
|
|
|
|
|
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
|
|
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
|
|
- private void SendToServer(IEnumerable<ElasticSearchRecord> records)
|
|
|
|
|
|
+ private void SendToServer(IEnumerable<ElasticSearchRecord> records, StringBuilder buffer = null)
|
|
{
|
|
{
|
|
var request = new HttpRequestMessage(HttpMethod.Post, bulkUrl);
|
|
var request = new HttpRequestMessage(HttpMethod.Post, bulkUrl);
|
|
- lock (buffer)
|
|
|
|
|
|
+
|
|
|
|
+ if (buffer == null) buffer = new StringBuilder();
|
|
|
|
+ foreach (var record in records)
|
|
{
|
|
{
|
|
- buffer.Clear();
|
|
|
|
- foreach (var record in records)
|
|
|
|
- {
|
|
|
|
- buffer.AppendLine("{\"create\":{}}").AppendLine(record.Serialize());
|
|
|
|
- }
|
|
|
|
- request.Content = new StringContent(buffer.ToString(), Vit.Core.Module.Serialization.Serialization_Newtonsoft.defaultEncoding, "application/json");
|
|
|
|
- buffer.Clear();
|
|
|
|
|
|
+ buffer.AppendLine("{\"create\":{}}").AppendLine(record.Serialize());
|
|
}
|
|
}
|
|
|
|
+ request.Content = new StringContent(buffer.ToString(), Vit.Core.Module.Serialization.Serialization_Newtonsoft.defaultEncoding, "application/json");
|
|
|
|
+ buffer.Clear();
|
|
|
|
+
|
|
// TODO: retry when fail.
|
|
// TODO: retry when fail.
|
|
// batch: batchIntervalInSeconds, batchSizeLimit, queueLimit
|
|
// batch: batchIntervalInSeconds, batchSizeLimit, queueLimit
|
|
httpClient.SendAsync(request);
|
|
httpClient.SendAsync(request);
|
|
|
|
|
|
|
|
|
|
//var response = httpClient.SendAsync(request).Result;
|
|
//var response = httpClient.SendAsync(request).Result;
|
|
|
|
+ //var body = response.Content.ReadAsStringAsync().Result;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|