C#|.NET : Generic Concurrent Queue (6/6)

... by g corallo, on Flickr
, a photo by g corallo on Flickr.

Visit these posts for Part 1, Part 2, Part 3, Part 4, Part 5, and real life implementation

In this concluding post we will make some changes to further separate some concerns and see multiple queues in action. First we will take the queue initialization to higher level in availability and define the queues in our top level class, i.e. App.xaml.cs. This way our queues are available everywhere in the application. We will move out background enqueue-ing to class level and expose it through a  method. This is to encapsulate the behavior to self-enqueue-ing in the class itself. So the user of the class does not have to know anything about the queue, they could simply call the class’ method and the class will queue itself for background work. Let’s start.
First, let’s modify the code of TypeOneTask class we created in Part 5, and encapsulate enqueue behavior in the class itself. We will introduce a new public method QueueInForBackgroundWork:

        public void QueueInForBackgroundWork()
        {
            Debug.WriteLine("UI Thread : {0} enqueue starting", this.Name);
            App.t1Queue.Process(this, true, false);
            Debug.WriteLine("UI Thread : {0} enqueue finished", this.Name);
        }

Before going to consumption code for this method in our main page’s code behind, let’s first set-up TypeTwoTask class and a new queue for the same. We will create a new class file TaskTwoQueue.cs and write code in this file. Most of the code in these classes is similar to what we created in Part5, so I will write all the code in one code block:

namespace TaskQueue
{
    public class TaskTwoQueue : ConcurrentQueue<TypeTwoTask>
    {
        #region queue implementationsc
        TypeTwoTaskEqualityComparer _itemEqualityComparer = new TypeTwoTaskEqualityComparer();
        public override bool IsQueueAlive
        {
            get
            {
                return base.IsQueueAlive;
            }
        }
        public override IEqualityComparer<TypeTwoTask> QueueItemEualityComparer
        {
            get
            {
                return _itemEqualityComparer;
            }
        }
        public override bool IsQueueAvailableForEnqueue()
        {
            if (this.Count > 10)
            {
                return false;
            }
            else
            {
                return true;
            }
        }
        protected override void ProcessRequest(TypeTwoTask taskRequest, bool Async)
        {
            taskRequest.Process();
        }
        public override void Exit(bool discardPendingRequests)
        {
            base.Exit(discardPendingRequests);
        }
        public override void DoWhenQueueGetsEmpty()
        {
            //doing nothing
        }
        #endregion
    }

    public class TypeTwoTask
    {
        Random testWait = new Random();
        public string Name { get; set; }
        public event EventHandler BackgroundProcessFinished;
        public TypeTwoTask(string name)
        {
            Name = name;
        }
        public void Process()
        {
            DateTime _st = DateTime.Now;
            Debug.WriteLine("BG Thread : Task {0} starting @ {1}", Name, _st.ToString("hh:mm:ss.fff"));
            Thread.CurrentThread.Join(testWait.Next(735, 1923));
            Debug.WriteLine("BG Thread : Task {0} Ended @ {1}", Name, DateTime.Now.ToString("hh:mm:ss.fff"));
            Deployment.Current.Dispatcher.BeginInvoke(() =>
            {
                if (BackgroundProcessFinished != null)
                {
                    BackgroundProcessFinished(this, new EventArgs());
                }
            });
        }
        public void QueueInForBackgroundWork()
        {
            Debug.WriteLine("UI Thread : {0} enqueue starting", this.Name);
            App.t2Queue.Process(this, true, false);
            Debug.WriteLine("UI Thread : {0} enqueue finished", this.Name);
        }
    }

    public class TypeTwoTaskEqualityComparer : IEqualityComparer<TypeTwoTask>
    {
        public bool Equals(TypeTwoTask A, TypeTwoTask B)
        {
            if (A != null && B != null)
            {
                return A.Name == B.Name;
            }
            else
            {
                return false;
            }
        }
        public int GetHashCode(TypeTwoTask itm)
        {
            return itm.GetHashCode();
        }
    }

}

We are ready with our second queue. We will introduce this queue in our application at the application level. In app.xaml.cs we will define both the queues like so (mind that both the queues are static):

    public partial class App : Application
    {
        public static TaskOneQueue t1Queue = new TaskOneQueue();
        public static TaskTwoQueue t2Queue = new TaskTwoQueue();
    }

The final user of our two classes, the main page, now, does not need to know about the queue (for previous code see part 5). It will only define the fields of appropriate task type, and call its QueuInForBackgroundWork as and when required. To demonstrate this behavior, we will modify MainPage.xaml.cs. We will first define 3 tasks of both the types at the page level. We will then have event handlers for both the types of the tasks to have informed about the completion of the task on UI thread. Button_click enqueues TypeOneTask in the queue by calling QueueInForBackgroundWork method and buttonDummy1_Click does the same for TypeTwoTask. Here is the code:

    public partial class MainPage : PhoneApplicationPage
    {
        TypeOneTask _t1 = new TypeOneTask("Type1 Task-1");
        TypeOneTask _t2 = new TypeOneTask("Type1 Task-2");
        TypeOneTask _t3 = new TypeOneTask("Type1 Task-3");

        TypeTwoTask _t4 = new TypeTwoTask("Type2 Task-4");
        TypeTwoTask _t5 = new TypeTwoTask("Type2 Task-5");
        TypeTwoTask _t6 = new TypeTwoTask("Type2 Task-6");

        public MainPage()
        {
            InitializeComponent();
            this.BackKeyPress += new EventHandler<System.ComponentModel.CancelEventArgs>(MainPage_BackKeyPress);
            _t2.BackgroundProcessFinished += (object sender, EventArgs e) => { Debug.WriteLine("UI Thread : TypeOneTask _t1 finished!"); };
            _t3.BackgroundProcessFinished += (object sender, EventArgs e) => { Debug.WriteLine("UI Thread : TypeOneTask _t2 finished!"); };
            _t5.BackgroundProcessFinished += (object sender, EventArgs e) => { Debug.WriteLine("UI Thread : TypeTwoTask _t5 finished!"); };
            _t6.BackgroundProcessFinished += (object sender, EventArgs e) => { Debug.WriteLine("UI Thread : TypeTwoTask _t6 finished!"); };
        }

        private void Button_Click(object sender, RoutedEventArgs e)
        {
            _t1.QueueInForBackgroundWork();
            _t2.QueueInForBackgroundWork();
            _t3.QueueInForBackgroundWork();
        }
        private void buttonDummy1_Click(object sender, RoutedEventArgs e)
        {
            _t4.QueueInForBackgroundWork();
            _t5.QueueInForBackgroundWork();
            _t6.QueueInForBackgroundWork();
        }

        private void buttonDummy2_Click(object sender, RoutedEventArgs e)
        {
            Debug.WriteLine("UI Thread : Dummy button 2 clicked!");
        }

    }

When you run the the app in debug mode, you get three buttons – Button1, dummy 1, and dummy 2, same as we had in part 5. This time Button1 and dummy 1 both enqueue tasks and dummy 2 is just to demonstrate that UI thread is free. Now, with 1-2 seconds delay, click on these three buttons top to bottom. You will get following log in your Debug output:

Output.Debug

UI Thread : Type1 Task-1 enqueue starting
UI Thread : Type1 Task-1 enqueue finished
UI Thread : Type1 Task-2 enqueue starting
UI Thread : Type1 Task-2 enqueue finished
UI Thread : Type1 Task-3 enqueue starting
UI Thread : Type1 Task-3 enqueue finished
BG Thread : Task Type1 Task-1 starting @ 12:29:35.530
UI Thread : Type2 Task-4 enqueue starting
UI Thread : Type2 Task-4 enqueue finished
BG Thread : Task Type2 Task-4 starting @ 12:29:36.502
UI Thread : Type2 Task-5 enqueue starting
UI Thread : Type2 Task-5 enqueue finished
UI Thread : Type2 Task-6 enqueue starting
UI Thread : Type2 Task-6 enqueue finished
BG Thread : Task Type1 Task-1 Ended @ 12:29:37.403
BG Thread : Task Type1 Task-2 starting @ 12:29:37.413
BG Thread : Task Type2 Task-4 Ended @ 12:29:37.575
BG Thread : Task Type2 Task-5 starting @ 12:29:37.581
UI Thread : Dummy button 2 clicked!
BG Thread : Task Type1 Task-2 Ended @ 12:29:38.500
BG Thread : Task Type1 Task-3 starting @ 12:29:38.506
UI Thread : TypeOneTask _t1 finished!
BG Thread : Task Type2 Task-5 Ended @ 12:29:38.635
BG Thread : Task Type2 Task-6 starting @ 12:29:38.642
UI Thread : TypeTwoTask _t5 finished!
BG Thread : Task Type1 Task-3 Ended @ 12:29:39.576
UI Thread : TypeOneTask _t2 finished!
BG Thread : Task Type2 Task-6 Ended @ 12:29:39.691
UI Thread : TypeTwoTask _t6 finished!

The analysis of this output shows that Type1 tasks were enqueued first, because I clicked Button1 first, and as soon as all three tasks are enqueued UI thread was freed. Also, “Task Type1 Task-1” started its BG work. Because UI was free, after two seconds, I clicked Dummy 1 button which enqueued all the three tasks of TypeTwoTask in its queue. Both these queues are working on their own background threads. UI thread is completely free. When tasks get over UI gets a message.

To summarize, Generic Concurrent Queue has following characteristics:

  • Generic and abstract, to be inherited to implement for any type.
  • Process requests can be queued up for BG thread.
  • Has a single lazy-init BG thread.
  • Supports sync and async requests.
  • By supplying a sync process request, you can stop other processes in queue and have sync request process first. Queue resumes automatically after the sync task is over.
  • Queue can be paused and restarted as and when required.
  • Queue can be exited | killed as and when required.
  • Inherited class defines rules for Queue capacity.
  • Inherited class defines rules for process request duplicate value.
  • Uses basic lock/wait/pulse mechanism, so compatible with almost all variants of .NET

Please feel free to comment. Hope you find this concurrent queue useful.

C#|.NET : Generic Concurrent Queue (5/6)

A bench by Myxi, on Flickr
A bench, a photo by Myxi on Flickr.

Visit these posts for Part 1, Part 2, Part 3, Part 4, and implementation!

The real magic starts now…

First of all we will create an example task class (could be any model/view-model class), which is our real world class with the code to be processed on BG thread. The class encapsulates its own behavior.

    public class TypeOneTask
    {
        Random _testWait = new Random();
        public string Name { get; set; }
        public event EventHandler BackgroundProcessFinished;
        public TypeOneTask(string name)
        {
            Name = name;
        }
        public void Process()
        {
            DateTime _st = DateTime.Now;
            Debug.WriteLine("BG Thread : Task {0} Started @ {1}", Name, _st.ToString("hh:mm:ss.fff"));
            Thread.CurrentThread.Join(_testWait.Next(735, 1923));
            Debug.WriteLine("BG Thread : Task {0} Ended @ {1}", Name, DateTime.Now.ToString("hh:mm:ss.fff"));
            Deployment.Current.Dispatcher.BeginInvoke(() =>
            {
                if (BackgroundProcessFinished != null)
                {
                    BackgroundProcessFinished(this, new EventArgs());
                }
            });
        }
    }

This is a simple example class with a Name property and Process method. The process method has Debug messages and Thread.Join to simulate process. On completion of the process, the method fires an event on the dispatcher thread, i.e. the UI thread.

We will also need an equality comparer, to avoid duplicate objects being queued for process. Currently we will simply compare the reference, but equating could be extended in real world classes.

    public class TypeOneTaskEqualityComparer : IEqualityComparer<TypeOneTask>
    {
        public bool Equals(TypeOneTask A, TypeOneTask B)
        {
            if (A != null && B != null)
            {
                return A == B;
            }
            else
            {
                return false;
            }
        }
        public int GetHashCode(TypeOneTask itm)
        {
            return itm.GetHashCode();
        }
    }

Now comes the implementation of our generic ConcurrentQueue class by inheriting in a new class named TaskOneQueue. We will implement the required method/properties in the derived class and in the ProcessRequest method, which is protected and called from the base class, we will call the TypeOneTask’s process method. This is the actual process which runs on the background thread. So here is the code for our TaskOneQueue:

   public class TaskOneQueue : ConcurrentQueue<TypeOneTask>
    {
        #region queue implementationsc
        TypeOneTaskEqualityComparer _itemEqualityComparer = new TypeOneTaskEqualityComparer();
        public override bool IsQueueAlive
        {
            get
            {
                return base.IsQueueAlive;
            }
        }
        public override IEqualityComparer<TypeOneTask> QueueItemEualityComparer
        {
            get
            {
                return _itemEqualityComparer;
            }
        }
        public override bool IsQueueAvailableForEnqueue()
        {
            if (this.Count > 20)
            {
                return false;
            }
            else
            {
                return true;
            }
        }
        protected override void ProcessRequest(TypeOneTask taskRequest, bool Async)
        {
            taskRequest.Process();
        }
        public override void Exit(bool discardPendingRequests)
        {
            base.Exit(discardPendingRequests);
        }
        public override void DoWhenQueueGetsEmpty()
        {
            //doing nothing
        }
        #endregion
    }

We have our structure taken care of. Let’s put this things in a example form to see how things work. We will create a simple Windows Phone project and in the default MainPage.xaml’s ContentPanel element, we will put a StackPanel containing three buttons:

      <Grid x:Name="ContentPanel" Grid.Row="1" Margin="12,0,12,0">
            <StackPanel>
        	    <Button Content="Button" Height="69" Margin="110,49,127,0" VerticalAlignment="Top" Click="Button_Click"/>
                <Button Name="buttonDummy1" Content="Dummy 1" Height="69" Margin="110,49,127,0" VerticalAlignment="Top" Click="buttonDummy1_Click" />
                <Button Name="buttonDummy2" Content="Dummy 2" Height="69" Margin="110,49,127,0" VerticalAlignment="Top"  Click="buttonDummy2_Click"/>
            </StackPanel>
        </Grid>

Code behind for this form looks like this:

   public partial class MainPage : PhoneApplicationPage
    {
        TaskOneQueue t1Queue = new TaskOneQueue();
        // Constructor
        public MainPage()
        {
            InitializeComponent();
            this.BackKeyPress += new EventHandler<System.ComponentModel.CancelEventArgs>(MainPage_BackKeyPress);
        }

        void MainPage_BackKeyPress(object sender, System.ComponentModel.CancelEventArgs e)
        {
            t1Queue.Exit(true);
        }

        private void Button_Click(object sender, RoutedEventArgs e)
        {
            TypeOneTask _t1 = new TypeOneTask("Type1 Task - 1");
            TypeOneTask _t2 = new TypeOneTask("Type1 Task - 2");
            TypeOneTask _t3 = new TypeOneTask("Type1 Task - 3");
            _t2.BackgroundProcessFinished += new EventHandler(_t2_BackgroundProcessFinished);
            _t3.BackgroundProcessFinished += new EventHandler(_t3_BackgroundProcessFinished);

            Debug.WriteLine("UI Thread : Task enqueue start");
            t1Queue.Process(_t1, true, false);
            t1Queue.Process(_t2, true, false);
            t1Queue.Process(_t3, true, false);
            Debug.WriteLine("UI Thread : Task enqueue finished. Enqueued Type1 tasks 1, 2, and 3. UI thread is free");
        }

        void _t3_BackgroundProcessFinished(object sender, EventArgs e)
        {
            Debug.WriteLine("UI Thread : All tasks are over!");
        }

        void _t2_BackgroundProcessFinished(object sender, EventArgs e)
        {
            Debug.WriteLine("UI Thread : Task two has finished and UI thread got the message!");
        }

        private void buttonDummy1_Click(object sender, RoutedEventArgs e)
        {
            Debug.WriteLine("UI Thread : Dummy button 1 clicked!");
        }

        private void buttonDummy2_Click(object sender, RoutedEventArgs e)
        {
            Debug.WriteLine("UI Thread : Dummy button 2 clicked!");
        }

    }

In the code above, we have defined TaskOneQueu type object – t1Queue. In Button_Click event handler we are enqueuing three TypeOneTasks and at the start and end of enqueuing, sending output to Debug window. This shows that UI thread gets free after enqueuing the tasks. This is further demonstrated in the click handlers of the buttonDummy1_Click and buttonDummy2_Click, which you can manually click at run time to see that UI thread is free and task processes are happening in the background. When we run this form in Debug mode, and click the first button and then subsequently Button 2 and Button 3 at 1-2 seconds apart, we get following output from Debug. You can see when UI thread gets free and first TypeOneTask starts processing. In the mean time the two dummy buttons are also clicked and their click events got registered successfully without having any impact on the background processes.

Output.Debug
UI Thread : Task enqueue start
UI Thread : Task enqueue finished. Enqueued Type1 tasks 1, 2, and 3. UI thread is free
BG Thread : Task Type1 Task - 1 Started @ 04:39:24.483
UI Thread : Dummy button 1 clicked!
BG Thread : Task Type1 Task - 1 Ended @ 04:39:26.436
BG Thread : Task Type1 Task - 2 Started @ 04:39:26.442
UI Thread : Dummy button 2 clicked!
BG Thread : Task Type1 Task - 2 Ended @ 04:39:28.258
BG Thread : Task Type1 Task - 3 Started @ 04:39:28.264
UI Thread : Task two has finished and UI thread got the message!
BG Thread : Task Type1 Task - 3 Ended @ 04:39:30.090
UI Thread : All tasks are over!

In the last post we will create one more queue and run multiple queues simultaneously to see things in action.

continued…part 6

C#|.NET : Generic Concurrent Queue (4/6)

Visit these posts for Part 1, Part 2 & Part 3.

My app has the implementation of this queue. Let’s look at the code of some of the important public and private methods in the class:

Reset method locks the _queueLocker object, which is functioning as Mutex, making sure only a single operation executes on the queue:

        /// <summary>
        /// Discards all pending requests and deletes all requests in queue, queue is ready for new items to be enqued after this.
        /// </summary>
        public virtual void Reset()
        {
            lock (_queueLocker)
            {
                RequestQueue.Clear();
            }
        }

Exit method exits the queue. If discardPendingRequests is true, queue exits discarding any pending request. Exit does not ensure an immediate termination of queue. If a task is in process, first the task will complete and then remaining task will be discarded.

        /// <summary>
        /// Exits queue.
        /// </summary>
        /// <param name="discardPendingRequests"></param>
        public virtual void Exit(bool discardPendingRequests)
        {
            _exitWhenQueueIsEmpty = true;
            _discardPendingRequests = discardPendingRequests;
            _closeQueue = discardPendingRequests;
            lock (_queueLocker)
            {
                Monitor.Pulse(_queueLocker);
            }
        }

HaltQueueProcess halts the queue and ProceedQueueProcess brings the queue out of the halt.

        /// <summary>
        /// Halts the queue process. 
        /// </summary>
        public virtual void HaltQueueProces()
        {
            _haltQueue = true;
        }
        /// <summary>
        /// Call if queue is halted and you want to restart it.
        /// </summary>
        public void ProceedQueueProcess()
        {
            lock (_queueLocker)
            {
                _haltQueue = false;
                Monitor.Pulse(_queueLocker);
            }
        }

Process method is called to enqueue/process a task request. If Async parameter is true, task is enqueued and calling thread is freed. If Async parameter is false, other tasks in the queue are suspended and this new task is executed. The task executes on the calling thread and not on the background thread. isRecursive is usually used by those process requests which are called from inside the process queue. If isRecursive is true, the request is automatically considered as synched request.

        /// <summary>
        /// Processes the request.
        /// </summary>
        /// <param name="request">The request to process</param>
        /// <param name="async">Is the request Async</param>
        /// <param name="isRecurssive">Is the request Recursive, meaning - was this method called from the Process itself. See example in notes.</param>
        /// <remarks>
        ///  Example 
        ///  Entity.Save 
        ///    `--~IsolatedStorageRequestQueue.Process~-----. 
        ///         `--~Entity.OnEntitySaved     |
        ///             `--~Entity.ContainedEntities.Save -'
        ///             
        /// </remarks>
        public void Process(T request, bool async, bool isRecurssive)
        {
            if (_closeQueue || request == null) return;
            if (Async && (_processThread == null || !_processThread.IsAlive))
            {
                _processThread = new Thread(new ThreadStart(processQueue));
                _processThread.Name = "Process thread @ " + Environment.TickCount.ToString();
                _processThread.Start();
            }
            if (Async && !isRecursive)
            {
                enqueueRequest(request);
            }
            else
            {
                if (!isRecurssive)
                {
                    _synchedRequestStatus = RequestStatus.Pending;
                    lock (_processLocker)
                    {
                        _synchedRequestStatus = RequestStatus.InProcess;
                        ProcessRequest(request, false);
                        _synchedRequestStatus = RequestStatus.Finished;
                        Monitor.Pulse(_processLocker);
                    }
                }
                else
                {
                    ProcessRequest(request, false);
                }
            }
        }

Private enqueuRequest checks if queue is full (determined by abstract method IsQueueAvailableForEnqueue, which is to be implemented in the derived class.), if it is, it waits for queue to be available for enqueue. Also, if the request is checked for duplicate entry before enqueue.

        private void enqueueRequest(T request)
        {
            while (QueueIsFull)
            {
                lock (_waitForQueueToBeAvailableForEnqueue) Monitor.Wait(_waitForQueueToBeAvailableForEnqueue, 3000);
                QueueIsFull = !IsQueueAvailableForEnqueue();
            }
            lock (_queueLocker)
            {
                if (!RequestQueue.Contains(request, QueueItemEualityComparer))
                {
                    RequestQueue.Enqueue(request);
                    QueueIsFull = !IsQueueAvailableForEnqueue();
                }
                Monitor.Pulse(_queueLocker);
            }
        }

Private method processQueue is the internal method which takes care of task dequeue. All the tasks in the queue are processed in FIFO priority. This method and the task run on background thread.

        private void processQueue()
        {
            _isQueueAlive = true;
            while (true)
            {
                T RequestToProcess = default(T);
                bool processthisRequest = false;
                if (RequestQueue.Count == 0)
                {
                    this.DoWhenQueueGetsEmpty();
                    lock (WaitTillQueueGetsEmpty)
                    {
                        Monitor.Pulse(WaitTillQueueGetsEmpty);
                    }
                    lock (WaitTillQueueEmptiesOrHalted)
                    {
                        Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
                    }
                }
                if (Monitor.TryEnter(_queueLocker, 1000))
                {
                    try
                    {
                        while (_haltQueue || (RequestQueue.Count == 0 && !_exitWhenQueueIsEmpty && !_discardPendingRequests))
                        {
                            Monitor.Wait(_queueLocker, 7000);
                            if (_haltQueue)
                            {
                                lock (WaitTillQueueEmptiesOrHalted)
                                {
                                    Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
                                }
                            }
                        }
                        if (_discardPendingRequests)
                        {
                            break; // jump out of the loop immediately
                        }
                        if (RequestQueue.Count != 0)
                        {
                            try
                            {
                                RequestToProcess = RequestQueue.Dequeue();
                                QueueIsFull = !IsQueueAvailableForEnqueue();
                                if (!QueueIsFull)
                                {
                                    lock (_waitForQueueToBeAvailableForEnqueue) Monitor.Pulse(_waitForQueueToBeAvailableForEnqueue);
                                }
                                processthisRequest = true;
                            }
                            catch (Exception error)
                            {
                                //log system error
                            }
                        }
                    }
                    catch (Exception errorOuter)
                    {
                        //log system error
                    }
                    finally
                    {
                        Monitor.Exit(_queueLocker);
                    }
                }
                else
                {
                    //waiting for more than 500MS for dequeuing could be an indication of some problem.
                    //Log system error
                }
                if (processthisRequest)
                {

                    lock (_processLocker)
                    {
                        //Wait for pulse, if some synched request is pending or processing.
                        while (_synchedRequestStatus == RequestStatus.Pending || _synchedRequestStatus == RequestStatus.InProcess)
                        {
                            Monitor.Wait(_processLocker, 3000); //recheck after every 3 seconds.
                        }
                        ProcessRequest(RequestToProcess, true);
                    }
                }
                if (_exitWhenQueueIsEmpty && RequestQueue.Count == 0)
                {
                    if (RequestQueue.Count == 0)
                    {
                        DoWhenQueueGetsEmpty();
                    }
                    break; // jump out of the loop if queue is empty.
                }
            }
            lock (WaitTillQueueEmptiesOrHalted)
            {
                Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
            }
            _isQueueAlive = false;
            lock (WaitTillQueueCloses)
            {
                Monitor.Pulse(WaitTillQueueCloses); //Client systems can wait on this object so as to get signaled when queue exits
            }
        }

Following is the complete code of the class:

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System;

namespace MyAwesomeCompany.Threading.Concurrent
{
    public enum RequestStatus { None, Pending, InProcess, Finished }
    public abstract class ConcurrentQueue<T>
    {
        #region declarations
        public readonly object WaitTillQueueCloses = new object();
        public readonly object WaitTillQueueGetsEmpty = new object();
        public readonly object WaitTillQueueEmptiesOrHalted = new object();
        Thread _processThread;
        bool _haltQueue = false;
        bool _closeQueue = false;
        bool _exitWhenQueueIsEmpty = false;
        bool _discardPendingRequests = false;
        private bool _isQueueAlive = false;
        private RequestStatus _synchedRequestStatus = RequestStatus.None;
        readonly object _queueLocker = new object();
        readonly object _waitForQueueToBeAvailableForEnqueue = new object();
        readonly object _processLocker = new object();
        internal bool QueueIsFull = false; //intially queue is not full
        protected Queue<T> RequestQueue = new Queue<T>();
        #endregion //declarations

        #region properties
        /// <summary>
        /// True if queue is halted.
        /// </summary>
        public bool IsQueueHalted { get { return _haltQueue; } }
        /// <summary>
        /// True indicates that the process thread is spinning - even if queue is halted.
        /// </summary>
        public virtual bool IsQueueAlive { get { return _isQueueAlive; } }
        /// <summary>
        /// True indicates queue is closed for further enqueing. IsQueuAlive may still be true because existing processes are going on.
        /// </summary>
        public bool IsQueueClosed { get { return _closeQueue; } }
        public int Count { get { return RequestQueue.Count; } }
        public bool IsQueueEmpty { get { return RequestQueue.Count == 0; } }
        #endregion //properties

        public ConcurrentQueue()
        {
        }

        #region public methods
        /// <summary>
        /// Discards all pending requests and deletes all requests in queue, queue is ready for new items to be enqued after this.
        /// </summary>
        public virtual void Reset()
        {
            lock (_queueLocker)
            {
                RequestQueue.Clear();
            }
        }
        /// <summary>
        /// Exits queue.
        /// </summary>
        /// <param name="discardPendingRequests"></param>
        public virtual void Exit(bool discardPendingRequests)
        {
            _exitWhenQueueIsEmpty = true;
            _discardPendingRequests = discardPendingRequests;
            _closeQueue = discardPendingRequests;
            lock (_queueLocker)
            {
                Monitor.Pulse(_queueLocker);
            }
        }
        /// <summary>
        /// Halts the queue process. 
        /// </summary>
        public virtual void HaltQueueProces()
        {
            _haltQueue = true;
        }
        /// <summary>
        /// Call if queue is halted and you want to restart it.
        /// </summary>
        public void ProceedQueueProcess()
        {
            lock (_queueLocker)
            {
                _haltQueue = false;
                Monitor.Pulse(_queueLocker);
            }
        }
        /// <summary>
        /// Processes the request.
        /// </summary>
        /// <param name="request">The request to process</param>
        /// <param name="Async">Is the request Async</param>
        /// <param name="isRecurssive">Is the request Recursive, meaning - was this method called from the Process itself. See example in notes.</param>
        /// <remarks>
        ///  Example 
        ///  Entity.Save 
        ///    `--~IsolatedStorageRequestQueue.Process~-----. 
        ///         `--~Entity(StopWatch).OnEntitySaved     |
        ///             `--~Entity(ActiveClockRecord).Save -'
        ///             
        /// </remarks>
        public void Process(T request, bool Async, bool isRecurssive)
        {
            if (_closeQueue) return;
            if (Async && (_processThread == null || !_processThread.IsAlive))
            {
                _processThread = new Thread(new ThreadStart(processQueue));
                _processThread.Name = "Process thread @ " + Environment.TickCount.ToString();
                _processThread.Start();
            }
            if (Async)
            {
                enqueueRequest(request);
            }
            else
            {
                if (!isRecurssive)
                {
                    _synchedRequestStatus = RequestStatus.Pending;
                    lock (_processLocker)
                    {
                        _synchedRequestStatus = RequestStatus.InProcess;
                        ProcessRequest(request, false);
                        _synchedRequestStatus = RequestStatus.Finished;
                        Monitor.Pulse(_processLocker);
                    }
                }
                else
                {
                    ProcessRequest(request, false);
                }
            }
        }
        /// <summary>
        /// This method executes as soon as the queue gets empty.[See remark for imp info]
        /// <remarks>
        ///  * It runs on the same thread which executes other requests in the queue.
        ///  * Currently there is no mechanism to abort this method.
        ///  * So, if a long running task is performed in this method, the new item in queue will only execute once this method finishes its task.
        /// </remarks>
        /// </summary>
        public virtual void DoWhenQueueGetsEmpty()
        {
            //to be implemented in derived class if required.
        }
        #endregion //public methods

        #region abstract members
        public abstract IEqualityComparer<T> QueueItemEualityComparer { get; }
        public abstract bool IsQueueAvailableForEnqueue();
        protected abstract void ProcessRequest(T Item, bool Async);
        #endregion

        #region private methods
        private void enqueueRequest(T request)
        {
            while (QueueIsFull)
            {
                lock (_waitForQueueToBeAvailableForEnqueue) Monitor.Wait(_waitForQueueToBeAvailableForEnqueue, 3000);
                QueueIsFull = !IsQueueAvailableForEnqueue();
            }
            lock (_queueLocker)
            {
                if (!RequestQueue.Contains(request, QueueItemEualityComparer))
                {
                    RequestQueue.Enqueue(request);
                    QueueIsFull = !IsQueueAvailableForEnqueue();
                }
                Monitor.Pulse(_queueLocker);
            }
        }
        private void processQueue()
        {
            _isQueueAlive = true;
            while (true)
            {
                T RequestToProcess = default(T);
                bool processthisRequest = false;
                if (RequestQueue.Count == 0)
                {
                    this.DoWhenQueueGetsEmpty();
                    lock (WaitTillQueueGetsEmpty)
                    {
                        Monitor.Pulse(WaitTillQueueGetsEmpty);
                    }
                    lock (WaitTillQueueEmptiesOrHalted)
                    {
                        Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
                    }
                }
                if (Monitor.TryEnter(_queueLocker, 1000))
                {
                    try
                    {
                        while (_haltQueue || (RequestQueue.Count == 0 && !_exitWhenQueueIsEmpty && !_discardPendingRequests))
                        {
                            Monitor.Wait(_queueLocker, 7000);
                            if (_haltQueue)
                            {
                                lock (WaitTillQueueEmptiesOrHalted)
                                {
                                    Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
                                }
                            }
                        }
                        if (_discardPendingRequests)
                        {
                            break; // jump out of the loop immediately
                        }
                        if (RequestQueue.Count != 0)
                        {
                            try
                            {
                                RequestToProcess = RequestQueue.Dequeue();
                                QueueIsFull = !IsQueueAvailableForEnqueue();
                                if (!QueueIsFull)
                                {
                                    lock (_waitForQueueToBeAvailableForEnqueue) Monitor.Pulse(_waitForQueueToBeAvailableForEnqueue);
                                }
                                processthisRequest = true;
                            }
                            catch (Exception error)
                            {
                                //log system error
                            }
                        }
                    }
                    catch (Exception errorOuter)
                    {
                        //log system error
                    }
                    finally
                    {
                        Monitor.Exit(_queueLocker);
                    }
                }
                else
                {
                    //waiting for more than 500MS for dequeuing could be an indication of some problem.
                    //Log system error
                }
                if (processthisRequest)
                {

                    lock (_processLocker)
                    {
                        //Wait for pulse, if some synched request is pending or processing.
                        while (_synchedRequestStatus == RequestStatus.Pending || _synchedRequestStatus == RequestStatus.InProcess)
                        {
                            Monitor.Wait(_processLocker, 3000); //recheck after every 3 seconds.
                        }
                        ProcessRequest(RequestToProcess, true);
                    }
                }
                if (_exitWhenQueueIsEmpty && RequestQueue.Count == 0)
                {
                    if (RequestQueue.Count == 0)
                    {
                        DoWhenQueueGetsEmpty();
                    }
                    break; // jump out of the loop if queue is empty.
                }
            }
            lock (WaitTillQueueEmptiesOrHalted)
            {
                Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
            }
            _isQueueAlive = false;
            lock (WaitTillQueueCloses)
            {
                Monitor.Pulse(WaitTillQueueCloses); //Client systems can wait on this object so as to get signaled when queue exits
            }
        }
        #endregion
    }
}

In next posts we will create an example derived queue from ConcurrentQueue and use the queue in some example code to see the effect in foreground+background process.

Happy queue-ing!

Part 5

C#|.NET : Generic Concurrent Queue (3/6)

Queue by oatsy40, on Flickr
Queue, a photo by oatsy40 on Flickr.

Visit these posts for Part 1 & Part 2

I wrote ConcurrentQueue as Generic Abstract class in c# with limited .NET. You can see the complete mechanism at play in my app. The class provides an easy enqueue of process requests on background thread. The client does not have to bother much about the threading etc., they can just call a method to enqueue requests which they want to process in the background. With FIFO method queue does its work in the background. The derived classes of ConcurrentQueue decide two most important aspects of the queue – type of the request and the related process of the request. Type of request is taken care of by Generics and process related to request is to be implemented by the derived class. Process and request could have been implemented with Task & Delegate but I was more comfortable with straight forward methods. In some future version I might update it and implement it with Task.

Let’s take a look at the class diagram:

ClassDiagram

RequestStatus enum is currently used internally to register the status of a request in the queue. I will not discuss fields, we will know about them when we go through the code. Let’s have a look at the properties and methods:

Properties

  • Count: Returns the number of requests waiting in the queue.
  • IsQueueAlive: Returns Boolean to indicate whether is queue is alive to take/process requests.
  • IsQueueClosed: Returns Boolean to indicate if queue is available for new request or not.
  • IsQueueEmpty: Reurns Boolean to inform whether queue has requests or not.
  • IsQueueHalted: Returns Boolean. True indicates queue is not processing any request and waiting to be started again.
  • QueueItemEqualityComparer: The derived class determines equality comparer for the type.

Methods

  • ConcurrentQueue : Constructor
  • DoWhenQueueGetsEmpty: Executes on the process thread when queue gets empty. This is a virtual method for derived class to add more functionality.
  • Exit: Client calls this method to signal an exit with appropriate exiting behavior. Queue is not guaranteed to exit immediately. Derived class can extend the logic of this virtual method for exiting queue process.
  • HaltQueueProcess: Client calls this method to halt background process of the queue. Background process completes the current process and waits for signal (from ProceedQueueProcess) to take up next request in queue. During queue is in halt state, new requests can still be enqueued. This is a virtual method and derived class can extend it.
  • ProceedQueueProcess: Client calls this method to bring the queue out of the halt.
  • IsQueueAvailableForEnqueu: Derived class implements logic in this abstract method to determine whether queue should be considered as available for enqueue.
  • Process: Client calls this method to enqueue request.
  • ProcessRequest: The background thread calls this abstract method for processing. Each derived class implements the process for the request.
  • Reset: Queue is emptied and made ready for a new enqueue.

Next, we will go through the internal code of the class.

Part 4