using Gebhardt.Shared; using Gebhardt.Shared.Process.ProducerConsumer; using Gebhardt.StoreWare.WcsWms.Constants; using Gebhardt.StoreWare.WcsWms.InterfaceWcsWms.EntityFramework; using Gebhardt.StoreWare.WcsWms.InterfaceWcsWms.EntityFramework.Models; using System; using System.Collections.Generic; using System.Linq; using System.Text.RegularExpressions; using Gebhardt.StoreWare.WcsWms.InterfaceWcsWms.Interfaces; using Gebhardt.StoreWare.WcsWms.InterfaceWcsWms.Services; using static Gebhardt.StoreWare.Wcs.Common.ConstantsCommon.WatchdogConstants; namespace Gebhardt.StoreWare.Wcs.HostBooking { internal class HostBookingProducer : Producer { private bool _firstExecution = true; private int _consumerQueueLength; private bool _useLoadBalancing; private readonly IHostMessageFromWmsService _service = new HostMessageFromWmsService(); /// /// Fügt im Dictionary dem angegebenen Consumer das Messagem hinzu /// /// /// /// private void AddDataForConsumer(ref Dictionary> dataForConsumers, string consumer, IHostMessage fromWms) { if (dataForConsumers.ContainsKey(consumer)) { dataForConsumers[consumer].Add(fromWms); } else { dataForConsumers.Add(consumer, new List { fromWms }); } } /// /// Bestimmt den Consumer, der für den Datensatz verantwortlich ist /// /// /// /// Consumer Name, wenn ein passender Consumer in der lsite ist, null, wenn kein passender Consumer in der Liste ist private string GetConsumerForBooking(List consumersWithDemand, string criterion) { //Gibt es eine Letzte Stelle, sonst default if (!criterion.IsNullOrEmptyOrDbEmpty()) { //Wir entscheiden mit der letzten Stelle der LE, welcher LE-Consumer verbucht oder ob der default Consumer das tun muss string endOfLe = criterion.Substring(criterion.Length - 1); //Der Name des Consumer endet mit dem gleichen Zeichen if (consumersWithDemand.Any(c => c.EndsWith(endOfLe))) { return consumersWithDemand.First(c => c.EndsWith(endOfLe)); } //Soll dieses Messagem von einem speziellen Consumer bearbeitet werden? Aber dieser ist beschäftigt else if (Regex.IsMatch(endOfLe, "[0-9]")) { //Messagem auslassen - keinem Consumer zuordnen return null; } //HU endet mit einem Zeichen, dass zu keinem Consumer passt also Default oder auslassen else { //Default Consumer muss mit Default enden! return consumersWithDemand.FirstOrDefault(c => c.EndsWith("Default")); } } else { //Default Consumer muss mit Default enden! return consumersWithDemand.FirstOrDefault(c => c.EndsWith("Default")); } } protected override Dictionary> GetDataForConsumers(List consumersWithDemand) { //TODO: jub evtl. die Messagem Stau Meldung aus KW übernehmen Dictionary> dataForConsumers = new Dictionary>(); try { //using HostEntities db = new HostEntitiesFactory(HostEntities.DefaultConnectionStringName).Create(); if (consumersWithDemand.Count > 0) { List fromWmsEntries; if (_firstExecution) { //beim ersten mal nach Neustart werden die InProgress Messages nochmal auf Pending zurückgesetzt //Ref == null damit nur Kopfnachrichten gefunden werden fromWmsEntries = _service.GetAllEntries(t => t.Status == Status.InProgress).ToList(); using HostEntities db = new HostEntitiesFactory(HostEntities.DefaultConnectionStringName).Create(); fromWmsEntries.ForEach(message => { FromWms fromWms = HostMessageFromWmsService.HostMapper.Map(message); db.Attach(fromWms); fromWms.Status = Status.Pending; }); db.SaveChanges(); Log.Write(LogLevel.Info, $"Erste Produce Schleife nach Neustart, setze {fromWmsEntries.Count} Telegramme zur Sicherheit nochmals von InProgress auf Pending"); _firstExecution = false; } //nur Messages, die noch an keinen Consumer gegeben wurden //es werden maximal so viele Messages abgerufen, wie ein einzelner Consumer annehmen könnte, damit das TryAdd sicher klappt und der Status nicht fälschlich //gesetzt wird //Ref == null damit nur Kopfnachrichten gefunden werden fromWmsEntries = _service.GetAllEntries(a => a.Status == Status.Pending).OrderBy(a => a.Id).Take(_consumerQueueLength).ToList();// db.FromWms.Where(a => a.Ref == null).Where(a => a.Status == Status.Pending).OrderBy(a => a.Id).Take(_consumerQueueLength).ToList(); if (fromWmsEntries.Any()) { using HostEntities db = new HostEntitiesFactory(HostEntities.DefaultConnectionStringName).Create(); //ohne LoadBalancing erhält der erste Consumer alle Messages zum Verbuchen if (!_useLoadBalancing) { //der Consumer muss also alle Messages verarbeiten und nicht nur loggen dataForConsumers.Add(consumersWithDemand.FirstOrDefault() ?? string.Empty, fromWmsEntries); //Status auf InProgress damit jede Message nur einmal abgerufen wird fromWmsEntries.ForEach(message => { FromWms fromWms = HostMessageFromWmsService.HostMapper.Map(message); db.Attach(fromWms); fromWms.Status = Status.InProgress; }); Log.Write(LogLevel.Low, $"Kein LoadBalancing aktiv, {fromWmsEntries.Count} neue HostMessages für Consumer {consumersWithDemand.FirstOrDefault()} gefunden"); } //mit LoadBalancing wird nach der letzen Ziffer der ersten HU der Message auf 10 Consumer verteilt, //enthält die Message keine HU wird es an Consumer 11 übergeben. else { Log.Write(LogLevel.Low, $"LoadBalancing aktiv, {fromWmsEntries.Count} neue HostMessages gefunden, verteile auf Consumer"); foreach (IHostMessage message in fromWmsEntries) { try { FromWms fromWms = HostMessageFromWmsService.HostMapper.Map(message); db.Attach(fromWms); Log.Write(LogLevel.Low, $"Producerschleife für {fromWms}"); string consumer; //keine HU in der Message, dann dem Default Consumer zuordnen if (fromWms.LeNo.IsNullOrEmptyOrDbEmpty()) { consumer = GetConsumerForBooking(consumersWithDemand, null); //wenn der Default Consumer nicht in der Liste war, Message auslassen if (consumer != null) { AddDataForConsumer(ref dataForConsumers, consumer, message); //Status auf InProgress damit die Message nur einmal abgerufen wird fromWms.Status = Status.InProgress; } } //Messages mit HU else { consumer = GetConsumerForBooking(consumersWithDemand, fromWms.LeNo); AddDataForConsumer(ref dataForConsumers, consumer, message); //Status auf InProgress damit die Message nur einmal abgerufen wird fromWms.Status = Status.InProgress; } } catch (Exception ex) { Log.WriteException(ex); } } Log.Write(LogLevel.Low, $"Producerschleife beendet"); } //Status Updates für alle weitergereichten Messages db.SaveChanges(); } else { Log.Write(LogLevel.Debug, 60, "Keine Messages in FromWms"); } } else { Log.Write(LogLevel.Debug, 60, "Kein Consumer hat demand"); } return dataForConsumers; } catch (Exception e) { Log.WriteException(e); return dataForConsumers; } } public HostBookingProducer(int workinterval, bool useLoadBalancing, int consumerQueueLength) : base(typeof(HostBookingProducer).Name, workinterval, true) { _consumerQueueLength = consumerQueueLength; _useLoadBalancing = useLoadBalancing; } } }