C#|.NET : Generic Concurrent Queue (4/6)

Visit these posts for Part 1, Part 2 & Part 3.

My app has the implementation of this queue. Let’s look at the code of some of the important public and private methods in the class:

Reset method locks the _queueLocker object, which is functioning as Mutex, making sure only a single operation executes on the queue:

        /// <summary>
        /// Discards all pending requests and deletes all requests in queue, queue is ready for new items to be enqued after this.
        /// </summary>
        public virtual void Reset()
        {
            lock (_queueLocker)
            {
                RequestQueue.Clear();
            }
        }

Exit method exits the queue. If discardPendingRequests is true, queue exits discarding any pending request. Exit does not ensure an immediate termination of queue. If a task is in process, first the task will complete and then remaining task will be discarded.

        /// <summary>
        /// Exits queue.
        /// </summary>
        /// <param name="discardPendingRequests"></param>
        public virtual void Exit(bool discardPendingRequests)
        {
            _exitWhenQueueIsEmpty = true;
            _discardPendingRequests = discardPendingRequests;
            _closeQueue = discardPendingRequests;
            lock (_queueLocker)
            {
                Monitor.Pulse(_queueLocker);
            }
        }

HaltQueueProcess halts the queue and ProceedQueueProcess brings the queue out of the halt.

        /// <summary>
        /// Halts the queue process. 
        /// </summary>
        public virtual void HaltQueueProces()
        {
            _haltQueue = true;
        }
        /// <summary>
        /// Call if queue is halted and you want to restart it.
        /// </summary>
        public void ProceedQueueProcess()
        {
            lock (_queueLocker)
            {
                _haltQueue = false;
                Monitor.Pulse(_queueLocker);
            }
        }

Process method is called to enqueue/process a task request. If Async parameter is true, task is enqueued and calling thread is freed. If Async parameter is false, other tasks in the queue are suspended and this new task is executed. The task executes on the calling thread and not on the background thread. isRecursive is usually used by those process requests which are called from inside the process queue. If isRecursive is true, the request is automatically considered as synched request.

        /// <summary>
        /// Processes the request.
        /// </summary>
        /// <param name="request">The request to process</param>
        /// <param name="async">Is the request Async</param>
        /// <param name="isRecurssive">Is the request Recursive, meaning - was this method called from the Process itself. See example in notes.</param>
        /// <remarks>
        ///  Example 
        ///  Entity.Save 
        ///    `--~IsolatedStorageRequestQueue.Process~-----. 
        ///         `--~Entity.OnEntitySaved     |
        ///             `--~Entity.ContainedEntities.Save -'
        ///             
        /// </remarks>
        public void Process(T request, bool async, bool isRecurssive)
        {
            if (_closeQueue || request == null) return;
            if (Async && (_processThread == null || !_processThread.IsAlive))
            {
                _processThread = new Thread(new ThreadStart(processQueue));
                _processThread.Name = "Process thread @ " + Environment.TickCount.ToString();
                _processThread.Start();
            }
            if (Async && !isRecursive)
            {
                enqueueRequest(request);
            }
            else
            {
                if (!isRecurssive)
                {
                    _synchedRequestStatus = RequestStatus.Pending;
                    lock (_processLocker)
                    {
                        _synchedRequestStatus = RequestStatus.InProcess;
                        ProcessRequest(request, false);
                        _synchedRequestStatus = RequestStatus.Finished;
                        Monitor.Pulse(_processLocker);
                    }
                }
                else
                {
                    ProcessRequest(request, false);
                }
            }
        }

Private enqueuRequest checks if queue is full (determined by abstract method IsQueueAvailableForEnqueue, which is to be implemented in the derived class.), if it is, it waits for queue to be available for enqueue. Also, if the request is checked for duplicate entry before enqueue.

        private void enqueueRequest(T request)
        {
            while (QueueIsFull)
            {
                lock (_waitForQueueToBeAvailableForEnqueue) Monitor.Wait(_waitForQueueToBeAvailableForEnqueue, 3000);
                QueueIsFull = !IsQueueAvailableForEnqueue();
            }
            lock (_queueLocker)
            {
                if (!RequestQueue.Contains(request, QueueItemEualityComparer))
                {
                    RequestQueue.Enqueue(request);
                    QueueIsFull = !IsQueueAvailableForEnqueue();
                }
                Monitor.Pulse(_queueLocker);
            }
        }

Private method processQueue is the internal method which takes care of task dequeue. All the tasks in the queue are processed in FIFO priority. This method and the task run on background thread.

        private void processQueue()
        {
            _isQueueAlive = true;
            while (true)
            {
                T RequestToProcess = default(T);
                bool processthisRequest = false;
                if (RequestQueue.Count == 0)
                {
                    this.DoWhenQueueGetsEmpty();
                    lock (WaitTillQueueGetsEmpty)
                    {
                        Monitor.Pulse(WaitTillQueueGetsEmpty);
                    }
                    lock (WaitTillQueueEmptiesOrHalted)
                    {
                        Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
                    }
                }
                if (Monitor.TryEnter(_queueLocker, 1000))
                {
                    try
                    {
                        while (_haltQueue || (RequestQueue.Count == 0 && !_exitWhenQueueIsEmpty && !_discardPendingRequests))
                        {
                            Monitor.Wait(_queueLocker, 7000);
                            if (_haltQueue)
                            {
                                lock (WaitTillQueueEmptiesOrHalted)
                                {
                                    Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
                                }
                            }
                        }
                        if (_discardPendingRequests)
                        {
                            break; // jump out of the loop immediately
                        }
                        if (RequestQueue.Count != 0)
                        {
                            try
                            {
                                RequestToProcess = RequestQueue.Dequeue();
                                QueueIsFull = !IsQueueAvailableForEnqueue();
                                if (!QueueIsFull)
                                {
                                    lock (_waitForQueueToBeAvailableForEnqueue) Monitor.Pulse(_waitForQueueToBeAvailableForEnqueue);
                                }
                                processthisRequest = true;
                            }
                            catch (Exception error)
                            {
                                //log system error
                            }
                        }
                    }
                    catch (Exception errorOuter)
                    {
                        //log system error
                    }
                    finally
                    {
                        Monitor.Exit(_queueLocker);
                    }
                }
                else
                {
                    //waiting for more than 500MS for dequeuing could be an indication of some problem.
                    //Log system error
                }
                if (processthisRequest)
                {

                    lock (_processLocker)
                    {
                        //Wait for pulse, if some synched request is pending or processing.
                        while (_synchedRequestStatus == RequestStatus.Pending || _synchedRequestStatus == RequestStatus.InProcess)
                        {
                            Monitor.Wait(_processLocker, 3000); //recheck after every 3 seconds.
                        }
                        ProcessRequest(RequestToProcess, true);
                    }
                }
                if (_exitWhenQueueIsEmpty && RequestQueue.Count == 0)
                {
                    if (RequestQueue.Count == 0)
                    {
                        DoWhenQueueGetsEmpty();
                    }
                    break; // jump out of the loop if queue is empty.
                }
            }
            lock (WaitTillQueueEmptiesOrHalted)
            {
                Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
            }
            _isQueueAlive = false;
            lock (WaitTillQueueCloses)
            {
                Monitor.Pulse(WaitTillQueueCloses); //Client systems can wait on this object so as to get signaled when queue exits
            }
        }

Following is the complete code of the class:

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System;

namespace MyAwesomeCompany.Threading.Concurrent
{
    public enum RequestStatus { None, Pending, InProcess, Finished }
    public abstract class ConcurrentQueue<T>
    {
        #region declarations
        public readonly object WaitTillQueueCloses = new object();
        public readonly object WaitTillQueueGetsEmpty = new object();
        public readonly object WaitTillQueueEmptiesOrHalted = new object();
        Thread _processThread;
        bool _haltQueue = false;
        bool _closeQueue = false;
        bool _exitWhenQueueIsEmpty = false;
        bool _discardPendingRequests = false;
        private bool _isQueueAlive = false;
        private RequestStatus _synchedRequestStatus = RequestStatus.None;
        readonly object _queueLocker = new object();
        readonly object _waitForQueueToBeAvailableForEnqueue = new object();
        readonly object _processLocker = new object();
        internal bool QueueIsFull = false; //intially queue is not full
        protected Queue<T> RequestQueue = new Queue<T>();
        #endregion //declarations

        #region properties
        /// <summary>
        /// True if queue is halted.
        /// </summary>
        public bool IsQueueHalted { get { return _haltQueue; } }
        /// <summary>
        /// True indicates that the process thread is spinning - even if queue is halted.
        /// </summary>
        public virtual bool IsQueueAlive { get { return _isQueueAlive; } }
        /// <summary>
        /// True indicates queue is closed for further enqueing. IsQueuAlive may still be true because existing processes are going on.
        /// </summary>
        public bool IsQueueClosed { get { return _closeQueue; } }
        public int Count { get { return RequestQueue.Count; } }
        public bool IsQueueEmpty { get { return RequestQueue.Count == 0; } }
        #endregion //properties

        public ConcurrentQueue()
        {
        }

        #region public methods
        /// <summary>
        /// Discards all pending requests and deletes all requests in queue, queue is ready for new items to be enqued after this.
        /// </summary>
        public virtual void Reset()
        {
            lock (_queueLocker)
            {
                RequestQueue.Clear();
            }
        }
        /// <summary>
        /// Exits queue.
        /// </summary>
        /// <param name="discardPendingRequests"></param>
        public virtual void Exit(bool discardPendingRequests)
        {
            _exitWhenQueueIsEmpty = true;
            _discardPendingRequests = discardPendingRequests;
            _closeQueue = discardPendingRequests;
            lock (_queueLocker)
            {
                Monitor.Pulse(_queueLocker);
            }
        }
        /// <summary>
        /// Halts the queue process. 
        /// </summary>
        public virtual void HaltQueueProces()
        {
            _haltQueue = true;
        }
        /// <summary>
        /// Call if queue is halted and you want to restart it.
        /// </summary>
        public void ProceedQueueProcess()
        {
            lock (_queueLocker)
            {
                _haltQueue = false;
                Monitor.Pulse(_queueLocker);
            }
        }
        /// <summary>
        /// Processes the request.
        /// </summary>
        /// <param name="request">The request to process</param>
        /// <param name="Async">Is the request Async</param>
        /// <param name="isRecurssive">Is the request Recursive, meaning - was this method called from the Process itself. See example in notes.</param>
        /// <remarks>
        ///  Example 
        ///  Entity.Save 
        ///    `--~IsolatedStorageRequestQueue.Process~-----. 
        ///         `--~Entity(StopWatch).OnEntitySaved     |
        ///             `--~Entity(ActiveClockRecord).Save -'
        ///             
        /// </remarks>
        public void Process(T request, bool Async, bool isRecurssive)
        {
            if (_closeQueue) return;
            if (Async && (_processThread == null || !_processThread.IsAlive))
            {
                _processThread = new Thread(new ThreadStart(processQueue));
                _processThread.Name = "Process thread @ " + Environment.TickCount.ToString();
                _processThread.Start();
            }
            if (Async)
            {
                enqueueRequest(request);
            }
            else
            {
                if (!isRecurssive)
                {
                    _synchedRequestStatus = RequestStatus.Pending;
                    lock (_processLocker)
                    {
                        _synchedRequestStatus = RequestStatus.InProcess;
                        ProcessRequest(request, false);
                        _synchedRequestStatus = RequestStatus.Finished;
                        Monitor.Pulse(_processLocker);
                    }
                }
                else
                {
                    ProcessRequest(request, false);
                }
            }
        }
        /// <summary>
        /// This method executes as soon as the queue gets empty.[See remark for imp info]
        /// <remarks>
        ///  * It runs on the same thread which executes other requests in the queue.
        ///  * Currently there is no mechanism to abort this method.
        ///  * So, if a long running task is performed in this method, the new item in queue will only execute once this method finishes its task.
        /// </remarks>
        /// </summary>
        public virtual void DoWhenQueueGetsEmpty()
        {
            //to be implemented in derived class if required.
        }
        #endregion //public methods

        #region abstract members
        public abstract IEqualityComparer<T> QueueItemEualityComparer { get; }
        public abstract bool IsQueueAvailableForEnqueue();
        protected abstract void ProcessRequest(T Item, bool Async);
        #endregion

        #region private methods
        private void enqueueRequest(T request)
        {
            while (QueueIsFull)
            {
                lock (_waitForQueueToBeAvailableForEnqueue) Monitor.Wait(_waitForQueueToBeAvailableForEnqueue, 3000);
                QueueIsFull = !IsQueueAvailableForEnqueue();
            }
            lock (_queueLocker)
            {
                if (!RequestQueue.Contains(request, QueueItemEualityComparer))
                {
                    RequestQueue.Enqueue(request);
                    QueueIsFull = !IsQueueAvailableForEnqueue();
                }
                Monitor.Pulse(_queueLocker);
            }
        }
        private void processQueue()
        {
            _isQueueAlive = true;
            while (true)
            {
                T RequestToProcess = default(T);
                bool processthisRequest = false;
                if (RequestQueue.Count == 0)
                {
                    this.DoWhenQueueGetsEmpty();
                    lock (WaitTillQueueGetsEmpty)
                    {
                        Monitor.Pulse(WaitTillQueueGetsEmpty);
                    }
                    lock (WaitTillQueueEmptiesOrHalted)
                    {
                        Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
                    }
                }
                if (Monitor.TryEnter(_queueLocker, 1000))
                {
                    try
                    {
                        while (_haltQueue || (RequestQueue.Count == 0 && !_exitWhenQueueIsEmpty && !_discardPendingRequests))
                        {
                            Monitor.Wait(_queueLocker, 7000);
                            if (_haltQueue)
                            {
                                lock (WaitTillQueueEmptiesOrHalted)
                                {
                                    Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
                                }
                            }
                        }
                        if (_discardPendingRequests)
                        {
                            break; // jump out of the loop immediately
                        }
                        if (RequestQueue.Count != 0)
                        {
                            try
                            {
                                RequestToProcess = RequestQueue.Dequeue();
                                QueueIsFull = !IsQueueAvailableForEnqueue();
                                if (!QueueIsFull)
                                {
                                    lock (_waitForQueueToBeAvailableForEnqueue) Monitor.Pulse(_waitForQueueToBeAvailableForEnqueue);
                                }
                                processthisRequest = true;
                            }
                            catch (Exception error)
                            {
                                //log system error
                            }
                        }
                    }
                    catch (Exception errorOuter)
                    {
                        //log system error
                    }
                    finally
                    {
                        Monitor.Exit(_queueLocker);
                    }
                }
                else
                {
                    //waiting for more than 500MS for dequeuing could be an indication of some problem.
                    //Log system error
                }
                if (processthisRequest)
                {

                    lock (_processLocker)
                    {
                        //Wait for pulse, if some synched request is pending or processing.
                        while (_synchedRequestStatus == RequestStatus.Pending || _synchedRequestStatus == RequestStatus.InProcess)
                        {
                            Monitor.Wait(_processLocker, 3000); //recheck after every 3 seconds.
                        }
                        ProcessRequest(RequestToProcess, true);
                    }
                }
                if (_exitWhenQueueIsEmpty && RequestQueue.Count == 0)
                {
                    if (RequestQueue.Count == 0)
                    {
                        DoWhenQueueGetsEmpty();
                    }
                    break; // jump out of the loop if queue is empty.
                }
            }
            lock (WaitTillQueueEmptiesOrHalted)
            {
                Monitor.Pulse(WaitTillQueueEmptiesOrHalted);
            }
            _isQueueAlive = false;
            lock (WaitTillQueueCloses)
            {
                Monitor.Pulse(WaitTillQueueCloses); //Client systems can wait on this object so as to get signaled when queue exits
            }
        }
        #endregion
    }
}

In next posts we will create an example derived queue from ConcurrentQueue and use the queue in some example code to see the effect in foreground+background process.

Happy queue-ing!

Part 5

3 thoughts on “C#|.NET : Generic Concurrent Queue (4/6)

  1. Pingback: C#|.NET : Generic Concurrent Queue (3/n) | Sharp Statements

  2. Pingback: C#|.NET : Generic Concurrent Queue (5/n) | Sharp Statements

  3. Pingback: C#|.NET : Generic Concurrent Queue (6/6) | Sharp Statements

Leave a comment