Implementacja wątku opróżniającego kolejkę?

0

Witam,

Jak ugryźć temat wątku który ma opróżnić kolejkę? Gdy kolejka jest pusta śpi zaś gdy pojawią się dane próbuje je przetworzyć w kolejności nadchodzenia prac? Jest na to jakiś sprytny sposób czy tylko ciągłe sprawdzanie kolejki i usypianie wątku na kilka sekund po stwierdzeniu że kolejka jest pusta?

Pozdrawiam,

mr-owl

0

Witam,

Danych przychodzą z event-u, w porcjach do 5000, a przetworzenie pojedynczej porcji (zawsze przez jeden wątek) może trwać po kilkadziesiąt sekund :-(

Pozdrawiam,

mr-owl

0

Jeśli nie chcesz odpytywać kolejki (dlaczego nie?), to musisz mieć jakieś managera wątku.
Po dodaniu do kolejki elementu, manager odbiera event i rozpoczyna wątek (jeśli nie jest już rozpoczęty). Wątek pobiera sobie element z kolejki i go przetwarza. Jak go już przetworzy, to pobiera następny element itd. W momencie, gdy wątek stwierdzi, że nie ma już niczego do obronienia, kończy się.

0

Witam,

Szukam jakiegoś przykładowego rozwiązania (kodu) jeśli można prosić...

Pozdrawiam,

mr-owl

0

Napisz jeszcze ilu jest piszących a ilu czytających z kolejki.

0

Witam,

Do kolejki będzie pisał jeden event za to w bardzo krótkim czasie może dodać wiele pojedynczych rekordów (long) i powinien być jeden wątek który odpowiada za opróżnianie tej kolejki po przetworzeniu rekordu (wgranie danych poprzez FTP na zdalny serwer oraz aktualizacja bazy danych).

Pozdrawiam,

mr-owl

P.S. Wymyśliłem coś takiego ale rozumiem jeszcze dlaczego pętla wykonuje się 2 razy po kilkukrotnym odpaleniu funkcji Run

 namespace QueueSample
{
    using System;
    using System.Threading;

    public sealed class DequeueThreadedJob : IDisposable
    {
        private EventWaitHandle syncEvent;

        private EventWaitHandle[] syncEventsArray;

        private Thread thread;

        public DequeueThreadedJob()
        {
            this.syncEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
            this.syncEventsArray = new EventWaitHandle[1];
            this.syncEventsArray[0] = this.syncEvent;

            this.thread = new Thread(this.DoWork);
            this.thread.Start();
        }

        public void Run()
        {
            this.syncEvent.Set();
        }

        private void DoWork()
        {
            while (this.syncEvent.WaitOne())
            {
                for (var i = 1; i <= 30; i++)
                {
                    Console.WriteLine("i = {0}", i);
                    Thread.Sleep(100);
                }
            }
        }

        public void Dispose()
        {
            this.syncEvent.Close();
        }
    }
}
0

Coś takiego (pisane z palca, bez środowiska, więc potraktuj to jako pseudokod):


//klasa operująca na danych
class MyData
{
  static Object mLock = new Object();
  static Stack<int> mStack = new Stack<int>(); //jest jakaś taka klasa w C#
  public static void AddToStack(int value)
  {
    lock(mLock)
    {
      mStack.Push(value);
    }
    StackListener.GetInstance.ItemAdded();
  }

  public static int GetFromStack()
  {
    lock(mLock)
    {
      return mStack.Pop();
    }
  }

  public static bool StackIsEmpty()
  {
    return mStack.Length == 0;
  }
}

//ten manager - zarządzacz
class StackListener
{
  //to jest singleton, nie opisuję, jak się tworzy singleton. Ale nie musi być singletonem

  Thread th = new Thread(DoWork);

  void DoWork()
  {
    while(!MyData.StackIsEmpty())
    {
      int item = MyData.GetFromStack();
      //tutaj obrabiamy pobrany item

      Thread.Sleep(100);
    }
  }

  public void ItemAdded()
  {
    //sprawdź, czy wątek już nie jest uruchomiony. Jeśli jest, to return
    th.Start();
  }
}

Mniej więcej tak to powinno wyglądać. No, zrobiłem to właściwie na klasach statycznych. Poprawniej będą to normalne klasy pewnie oparte na interfejsach. Ale tak to widzę.

0

Witam,

Ten numer to chyba nie przejdzie bo nie mogę wielokrotnie uruchamiać wątku. Problemem nie jest implementacja FILO a budowa wątku który przez większość czasu jest uśpiony a aktywny gdy pojawi się większa kolekcja danych do przetworzenia.

Pozdrawiam,

mr-owl

0

Witam,

Czy można to jeszcze jakoś ładniej zrealizować?

namespace QueueSample
{
    using System;
    using System.Collections.Concurrent;
    using System.Threading;

    public sealed class DequeueThreadedJob : IDisposable
    {
        private readonly EventWaitHandle syncEvent = new EventWaitHandle(false, EventResetMode.AutoReset);

        private readonly ConcurrentQueue<long> queue = new ConcurrentQueue<long>();

        public DequeueThreadedJob()
        {
            var thread = new Thread(this.DoWork);
            thread.Start();
        }

        public void Add(long ean)
        {
            this.queue.Enqueue(ean);
            this.syncEvent.Set();
        }

        private void DoWork()
        {
            while (this.syncEvent.WaitOne())
            {
                while (!this.queue.IsEmpty)
                {
                    long ean;

                    if (this.queue.TryDequeue(out ean))
                    {
                        Console.WriteLine("Do {0}", ean);
                        Thread.Sleep(1000);
                    }
                }

                Console.WriteLine("Queue Is Empty");
            }
        }

        public void Dispose()
        {
            this.syncEvent.Close();
        }
    }
}

Pozdrawiam,

mr-owl

0
  1. Rozserz ConcurrentQueue o interfejs INotifyCollectionChanged,
  2. Kiedy event się odpali uruchamiasz zadanie w lekkim Tasku - wątek w tym przypadku to według mnie marnowanie zasobów.
0

Witam,

Możesz proszę swoje słowa poprzeć jakimś kodem?

Pozdrawiam,

mr-owl

0

Coś takiego na szybko klepnąłem.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
            var collection = new MyCollection<int>();
            var task = Task.Factory.StartNew(() =>
            {
                collection.CollectionChanged += (sender, eventArgs) =>
                {
                    if(eventArgs.Action != NotifyCollectionChangedAction.Add)
                        return;

                    int item;
                    if (collection.TryGet(out item))
                    {
                        Console.WriteLine("Pobrano: "+item);
                        Task.Delay(500).Wait(); //symulacja trwania jakiejs logiki
                    };
                };
            });
            var task2 = Task.Factory.StartNew(() =>
            {
                for (var i = 0; i < 100; i++)
                {
                    Console.WriteLine("Dodano: " + i);
                    collection.Add(i);
                    
                }
            });
            Console.ReadLine();
        }
    }

    public class MyCollection<T> : INotifyCollectionChanged
    {
        public event NotifyCollectionChangedEventHandler CollectionChanged;
        private readonly ConcurrentQueue<T> collection;

        public MyCollection()
        {
            collection = new ConcurrentQueue<T>();
        }

        public void Add(T item)
        {
            collection.Enqueue(item);
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
        }

        public bool TryGet(out T item)
        {
            if (!collection.TryDequeue(out item)) return false;
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove,item));
            return true;
        }

        public int Count()
        {
            return collection.Count;
        }

        private void OnCollectionChanged(NotifyCollectionChangedEventArgs args)
        {
            CollectionChanged?.Invoke(this, args);
        }
    }
}
0

Bardzo dziękuję za przykład, zaraz sprawdzę czy działa tak jak tego potrzebuję?

mr-owl

0

Witam,

Albo nie rozumiem jak to działa albo nie działa tak jak oczekuję, chciałbym dodać w krótkim czasie 15000 rekordów (przychodzą z zewnętrznego systemu) i sukcesywnie je przetwarzać a tutaj mam przetwarzanie OnRequest, rekord po rekordzie w trakcie odbierania prac z zewnętrznego systemu.

Przeprasza ale nie o to mi chodzi,

mr-owl

0

Ale kombinujecie :D, mi to wygląda na klasyczny producent-konsument który w C# jest bardzo łatwo rozwiązywalny out of box za pomocą BlockingCollection :
https://msdn.microsoft.com/pl-pl/library/dd267312(v=vs.110).aspx

na samym dole masz przykład i opis działania.

1 użytkowników online, w tym zalogowanych: 0, gości: 1