203 lines
8.9 KiB
C#
203 lines
8.9 KiB
C#
using Gebhardt.Shared;
|
|
using Gebhardt.Shared.Process.ProducerConsumer;
|
|
using Gebhardt.StoreWare.WcsWms.Constants;
|
|
using Gebhardt.StoreWare.WcsWms.InterfaceWcsWms.EntityFramework;
|
|
using Gebhardt.StoreWare.WcsWms.InterfaceWcsWms.EntityFramework.Models;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text.RegularExpressions;
|
|
using Gebhardt.StoreWare.WcsWms.InterfaceWcsWms.Interfaces;
|
|
using Gebhardt.StoreWare.WcsWms.InterfaceWcsWms.Services;
|
|
using static Gebhardt.StoreWare.Wcs.Common.ConstantsCommon.WatchdogConstants;
|
|
|
|
namespace Gebhardt.StoreWare.Wcs.HostBooking
|
|
{
|
|
internal class HostBookingProducer : Producer<IHostMessage>
|
|
{
|
|
private bool _firstExecution = true;
|
|
|
|
private int _consumerQueueLength;
|
|
|
|
private bool _useLoadBalancing;
|
|
|
|
private readonly IHostMessageFromWmsService _service = new HostMessageFromWmsService();
|
|
|
|
/// <summary>
|
|
/// Fügt im Dictionary dem angegebenen Consumer das Messagem hinzu
|
|
/// </summary>
|
|
/// <param name="dataForConsumers"></param>
|
|
/// <param name="consumer"></param>
|
|
/// <param name="fromWms"></param>
|
|
private void AddDataForConsumer(ref Dictionary<string, List<IHostMessage>> dataForConsumers, string consumer, IHostMessage fromWms)
|
|
{
|
|
if (dataForConsumers.ContainsKey(consumer))
|
|
{
|
|
dataForConsumers[consumer].Add(fromWms);
|
|
}
|
|
else
|
|
{
|
|
dataForConsumers.Add(consumer, new List<IHostMessage> { fromWms });
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Bestimmt den Consumer, der für den Datensatz verantwortlich ist
|
|
/// </summary>
|
|
/// <param name="consumersWithDemand"></param>
|
|
/// <param name="criterion"></param>
|
|
/// <returns>Consumer Name, wenn ein passender Consumer in der lsite ist, null, wenn kein passender Consumer in der Liste ist</returns>
|
|
private string GetConsumerForBooking(List<string> consumersWithDemand, string criterion)
|
|
{
|
|
//Gibt es eine Letzte Stelle, sonst default
|
|
if (!criterion.IsNullOrEmptyOrDbEmpty())
|
|
{
|
|
//Wir entscheiden mit der letzten Stelle der LE, welcher LE-Consumer verbucht oder ob der default Consumer das tun muss
|
|
string endOfLe = criterion.Substring(criterion.Length - 1);
|
|
//Der Name des Consumer endet mit dem gleichen Zeichen
|
|
if (consumersWithDemand.Any(c => c.EndsWith(endOfLe)))
|
|
{
|
|
return consumersWithDemand.First(c => c.EndsWith(endOfLe));
|
|
}
|
|
//Soll dieses Messagem von einem speziellen Consumer bearbeitet werden? Aber dieser ist beschäftigt
|
|
else if (Regex.IsMatch(endOfLe, "[0-9]"))
|
|
{
|
|
//Messagem auslassen - keinem Consumer zuordnen
|
|
return null;
|
|
}
|
|
//HU endet mit einem Zeichen, dass zu keinem Consumer passt also Default oder auslassen
|
|
else
|
|
{
|
|
//Default Consumer muss mit Default enden!
|
|
return consumersWithDemand.FirstOrDefault(c => c.EndsWith("Default"));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
//Default Consumer muss mit Default enden!
|
|
return consumersWithDemand.FirstOrDefault(c => c.EndsWith("Default"));
|
|
}
|
|
}
|
|
|
|
protected override Dictionary<string, List<IHostMessage>> GetDataForConsumers(List<string> consumersWithDemand)
|
|
{
|
|
//TODO: jub evtl. die Messagem Stau Meldung aus KW übernehmen
|
|
Dictionary<string, List<IHostMessage>> dataForConsumers = new Dictionary<string, List<IHostMessage>>();
|
|
try
|
|
{
|
|
//using HostEntities db = new HostEntitiesFactory(HostEntities.DefaultConnectionStringName).Create();
|
|
if (consumersWithDemand.Count > 0)
|
|
{
|
|
List<IHostMessage> fromWmsEntries;
|
|
|
|
if (_firstExecution)
|
|
{
|
|
//beim ersten mal nach Neustart werden die InProgress Messages nochmal auf Pending zurückgesetzt
|
|
//Ref == null damit nur Kopfnachrichten gefunden werden
|
|
fromWmsEntries = _service.GetAllEntries(t => t.Status == Status.InProgress).ToList();
|
|
using HostEntities db = new HostEntitiesFactory(HostEntities.DefaultConnectionStringName).Create();
|
|
fromWmsEntries.ForEach(message =>
|
|
{
|
|
FromWms fromWms = HostMessageFromWmsService.HostMapper.Map<FromWms>(message);
|
|
db.Attach(fromWms);
|
|
fromWms.Status = Status.Pending;
|
|
});
|
|
db.SaveChanges();
|
|
Log.Write(LogLevel.Info, $"Erste Produce Schleife nach Neustart, setze {fromWmsEntries.Count} Telegramme zur Sicherheit nochmals von InProgress auf Pending");
|
|
_firstExecution = false;
|
|
}
|
|
|
|
//nur Messages, die noch an keinen Consumer gegeben wurden
|
|
//es werden maximal so viele Messages abgerufen, wie ein einzelner Consumer annehmen könnte, damit das TryAdd sicher klappt und der Status nicht fälschlich
|
|
//gesetzt wird
|
|
//Ref == null damit nur Kopfnachrichten gefunden werden
|
|
fromWmsEntries = _service.GetAllEntries(a => a.Status == Status.Pending).OrderBy(a => a.Id).Take(_consumerQueueLength).ToList();// db.FromWms.Where(a => a.Ref == null).Where(a => a.Status == Status.Pending).OrderBy(a => a.Id).Take(_consumerQueueLength).ToList();
|
|
|
|
|
|
if (fromWmsEntries.Any())
|
|
{
|
|
using HostEntities db = new HostEntitiesFactory(HostEntities.DefaultConnectionStringName).Create();
|
|
//ohne LoadBalancing erhält der erste Consumer alle Messages zum Verbuchen
|
|
if (!_useLoadBalancing)
|
|
{
|
|
//der Consumer muss also alle Messages verarbeiten und nicht nur loggen
|
|
dataForConsumers.Add(consumersWithDemand.FirstOrDefault() ?? string.Empty, fromWmsEntries);
|
|
//Status auf InProgress damit jede Message nur einmal abgerufen wird
|
|
fromWmsEntries.ForEach(message => {
|
|
FromWms fromWms = HostMessageFromWmsService.HostMapper.Map<FromWms>(message);
|
|
db.Attach(fromWms);
|
|
fromWms.Status = Status.InProgress;
|
|
});
|
|
Log.Write(LogLevel.Low, $"Kein LoadBalancing aktiv, {fromWmsEntries.Count} neue HostMessages für Consumer {consumersWithDemand.FirstOrDefault()} gefunden");
|
|
}
|
|
//mit LoadBalancing wird nach der letzen Ziffer der ersten HU der Message auf 10 Consumer verteilt,
|
|
//enthält die Message keine HU wird es an Consumer 11 übergeben.
|
|
else
|
|
{
|
|
Log.Write(LogLevel.Low, $"LoadBalancing aktiv, {fromWmsEntries.Count} neue HostMessages gefunden, verteile auf Consumer");
|
|
|
|
foreach (IHostMessage message in fromWmsEntries)
|
|
{
|
|
try
|
|
{
|
|
FromWms fromWms = HostMessageFromWmsService.HostMapper.Map<FromWms>(message);
|
|
db.Attach(fromWms);
|
|
Log.Write(LogLevel.Low, $"Producerschleife für {fromWms}");
|
|
string consumer;
|
|
//keine HU in der Message, dann dem Default Consumer zuordnen
|
|
if (fromWms.LeNo.IsNullOrEmptyOrDbEmpty())
|
|
{
|
|
consumer = GetConsumerForBooking(consumersWithDemand, null);
|
|
//wenn der Default Consumer nicht in der Liste war, Message auslassen
|
|
if (consumer != null)
|
|
{
|
|
AddDataForConsumer(ref dataForConsumers, consumer, message);
|
|
//Status auf InProgress damit die Message nur einmal abgerufen wird
|
|
fromWms.Status = Status.InProgress;
|
|
}
|
|
}
|
|
//Messages mit HU
|
|
else
|
|
{
|
|
consumer = GetConsumerForBooking(consumersWithDemand, fromWms.LeNo);
|
|
AddDataForConsumer(ref dataForConsumers, consumer, message);
|
|
//Status auf InProgress damit die Message nur einmal abgerufen wird
|
|
fromWms.Status = Status.InProgress;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.WriteException(ex);
|
|
}
|
|
}
|
|
Log.Write(LogLevel.Low, $"Producerschleife beendet");
|
|
}
|
|
//Status Updates für alle weitergereichten Messages
|
|
db.SaveChanges();
|
|
}
|
|
else
|
|
{
|
|
Log.Write(LogLevel.Debug, 60, "Keine Messages in FromWms");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log.Write(LogLevel.Debug, 60, "Kein Consumer hat demand");
|
|
}
|
|
|
|
return dataForConsumers;
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Log.WriteException(e);
|
|
return dataForConsumers;
|
|
}
|
|
}
|
|
|
|
public HostBookingProducer(int workinterval, bool useLoadBalancing, int consumerQueueLength) : base(typeof(HostBookingProducer).Name, workinterval, true)
|
|
{
|
|
_consumerQueueLength = consumerQueueLength;
|
|
_useLoadBalancing = useLoadBalancing;
|
|
}
|
|
}
|
|
} |