Re: thread-safety

Tech-Archive recommends: Repair Windows Errors & Optimize Windows Performance



On 13 Jan, 08:18, "Peter Duniho" <NpOeStPe...@xxxxxxxxxxxxxxxx> wrote:
On Wed, 07 Jan 2009 01:14:32 -0800, RedLars <Liverpool1...@xxxxxxxxx>  
wrote:

After some consideration I agree that the current queue implementation
and blocking stop is causing me problem and have decided to remove
both requirements. [...]

I left this reply to the last, because it seems that the code you posted  
should work fine (notwithstanding the timer issue, which is new :) ).

Your most recent post implies that you are still hoping for this "block  
until everything's stopped" behavior.  So I'm a little confused about  
which way you're trying to go.  If you still want that behavior, then we  
can look at solutions for that, but I'm going to leave things as they are  
for the moment because I'm still catching up and have plenty of stuff to  
think about already.  :)

Pete

Hi Peter,

I must apologize for the confusion that I have caused. After some
initial testing I realised that I needed a blocking Stop() method to
guarantee that no tasks are processed in the consumer thread after Stop
() has been executed and then not run the risk of using disposed
resource objects.

Below is the current code that I’m testing at the moment. This code
contains some minor changes to variable names since my previous posts,
and I started to use dotnet's queue collection with lock protection.
There are currently three minor issues I have with the current code:
1. Dispose(bool) will cause an exception in the consumer thread when
cleaning up the EventWaitHandlers. Disposing of the EventWaitHandlers
is executed before the consumer thread manages to exit the loop in
normal manner (via setting TerminatedWorkerThread etc).
2. In the background thread, using ‘continue’ to ‘escape’ the
resStartWaitHandle is bit iffy.
3. If a job processed by the consumer‘s background thread tries to
executed ThreadQueue.Stop() this will lead to a deadlock. I’m not sure
it is the responsibility of ThreadQueue to handle such circumstance or
the developer using the class.

Here is the code.

namespace na
{
public class ThreadQueue : IDisposable
{
private enum WorkerThreadState { Created, Started, Stopped };
private volatile WorkerThreadState state;
private object stateLocker = new object();
private Queue<IJob> queue = new Queue<IJob>();
private object queueLocker = new object();
private EventWaitHandle waitHandle = new ManualResetEvent
(false);
private EventWaitHandle stoppingWaitHandle = new
ManualResetEvent(false);
private EventWaitHandle reStartWaitHandle = new AutoResetEvent
(false);
private Thread thread;
private volatile bool TerminatedWorkerThread;
private List<Timer> timerList = new List<Timer>();
private string threadName;
private bool disposed;

public ThreadQueue () : this("ThreadQueue") { }

public ThreadQueue (string threadName)
{
this.threadName = threadName;
state = WorkerThreadState.Created;
}

~ThreadQueue () { Dispose(false); }

public void Start()
{
lock (stateLocker)
{
if (state == WorkerThreadState.Created)
{
state = WorkerThreadState.Started;
thread = new Thread(new ThreadStart
(this.Thread_Main));
thread.Name = threadName;
thread.Start();
}
else if (state == WorkerThreadState.Stopped)
{
state = WorkerThreadState.Started;
reStartWaitHandle.Set();
}
}
}

public void Stop()
{
lock (stateLocker)
{
if (state == WorkerThreadState.Stopped) return;
state = WorkerThreadState.Stopped;
}
waitHandle.Set();
stoppingWaitHandle.WaitOne();
}

public void Reset()
{
lock (stateLocker) if (state != WorkerThreadState.Stopped)
throw new WorkThreadException("Reseting workThread before
stopping."); ;
DisposeTimers();
lock (queueLocker)
{
queue.Clear();
}
}

public void Add(IJob job)
{
lock (stateLocker)
{
if (state == WorkerThreadState.Stopped) return;
}
lock (queueLocker)
{
queue.Enqueue(job);
}
lock (stateLocker)
{
if (state == WorkerThreadState.Started) waitHandle.Set
();
}
}

public void AddPeriodic(IJob job, int intervalTime)
{
Timer timer = new Timer(new TimerCallback(InternalAdd),
job, intervalTime, intervalTime);
lock (timerList)
{
timerList.Add(timer);
}
}

private void DisposeTimers()
{
lock (timerList)
{
foreach (Timer timer in timerList)
{
timer.Dispose();
}
timerList.Clear();
}
}

private void Thread_Main()
{
while (!TerminatedWorkerThread)
{
IJob job = null;
lock (stateLocker)
{
if (state == WorkerThreadState.Started)
{
lock (queueLocker)
{
if (queue.Count > 0)
job = queue.Dequeue();

if (queue.Count == 0)
waitHandle.Reset();
}
}
}
if (job != null)
{
job. Execute();
}
waitHandle.WaitOne();

lock (stateLocker)
{
if (state == WorkerThreadState.Stopped)
{
stoppingWaitHandle.Set();
}
else
{
continue;
}
}
reStartWaitHandle.WaitOne();
}
}

/* Timer callback */
private void InternalAdd(object obj)
{
lock (stateLocker)
{
if (state == WorkerThreadState.Stopped) return;
}
BeginAddWork(obj as IJob);
}

public void Dispose()
{
Stop();
Dispose(true);
GC.SuppressFinalize(true);
}

protected virtual void Dispose(bool disposeManagedResources)
{
if (!(disposed))
{
if (disposeManagedResources)
{
/* In stopped state at this point - now exit
background thread loop */
this.TerminatedWorkerThread = true;
reStartWaitHandle.Set();

/* Release managed resources */
DisposeTimers();
waitHandle.Close();
stoppingWaitHandle.Close();
reStartWaitHandle.Close();
}
}
this.disposed = true;
}
}
}



.



Relevant Pages

  • Re: thread-safety
    ... It supports immediate addition to the consumer's queue, and creation of a repeating timer that will periodicially add to the consumer's queue. ... private TimeSpan _tsExecute; ... // the JobConsumer class while holding it's own lock. ... public void Add ...
    (microsoft.public.dotnet.languages.csharp)
  • Re: .net 2.0 : looking for a "best practice" for multi threading jobs
    ... private int m_numberOfThreads; ... private object m_numberOfThreadsLockObject = new object; ... lock ... public void AddAction ...
    (microsoft.public.dotnet.framework)
  • Re: Hashtable updates to disk
    ... There may be frequent updates to the Hashtable and I want to avoid constant small update disk writes. ... private final Serializable obj; ... public PeriodicDumper(Serializable obj, Object lock, File file, long interval) { ... public void dump() throws IOException { ...
    (comp.lang.java.programmer)
  • Locking objects in an array
    ... I have a bidimensional array of objects (view it as a 2D lattice). ... Thus I want to lock the 4 objects of the region, ... public void increment{ ... private static final Random rnd = new Random; ...
    (comp.lang.java.programmer)
  • Re: thread-safety
    ... lock before another thread executes queue.Wakeup in Stop, ... internal void Enqueue ... private Thread thread; ... private object stateLocker = new object; ...
    (microsoft.public.dotnet.languages.csharp)