Re: thread-safety
- From: "Peter Duniho" <NpOeStPeAdM@xxxxxxxxxxxxxxxx>
- Date: Sat, 17 Jan 2009 16:01:51 -0800
On Thu, 15 Jan 2009 13:06:13 -0800, Peter Duniho <NpOeStPeAdM@xxxxxxxxxxxxxxxx> wrote:
[...]
I've started to catch up with things, and have some time to look at this again. I actually think at this point it might be most productive for me to just put together a basic code sample that illustrates an appropriate synchronization implementation that meets your needs. You'd still need to integrate it with whatever else you're doing, but at least it'd provide a framework to start with.
Okay...well, I didn't see any reply yet, but I did have a little time to put something together. Sorry for the sparse comments in the code; I focused on the parts that I thought might be the most non-intuitive and ignored the rest.
This is an implementation of a consumer that executes IJob instances. It supports immediate addition to the consumer's queue, and creation of a repeating timer that will periodicially add to the consumer's queue. You can clear all the timers at once.
It does _not_ wait for the current job to finish executing when pausing the consumer thread, nor is there a way to remove individual timers. These would not be difficult to add, but so far there hasn't been anything in this message thread that would suggest to me they were desired so I didn't bother. (Specifically, while you did ask about having the pause method block until the current job has finished, as near as I can tell that's an artificial requirement due to your impression of how the synchronization has to work).
I hope that this gives you some ideas for how to implement your own code. In particular, note that there isn't any need for WaitHandles at all, and a single object used with the Monitor class is sufficient for all the locking needs. This isn't the fastest thread-safe code, but it's reasonably simple and should perform fine in most typical scenarios, and best of all it works (as far as I know :) ).
Pete
using System;
using System.Collections.Generic;
using System.Threading;
namespace TestPausableConsumer
{
/* Specification:
*
* -- Consumer thread, executes one job at a time
* -- IJob interface with Execute method
* -- Add jobs as immediate, or timed/repetitive
* -- Timed/repetitive jobs are queued on timer,
* execute in order with existing jobs
* -- Consumer can be paused
* -- Timers are paused when consumer is
* -- Can clear entire timer list
*
* Input keys:
* <Spacebar> -- add a new job
* <P> -- pause the consumer thread
* <T> -- add a timer
* <C> -- clear all the timers
* <X> -- exit the program
*/
class Program
{
static void Main(string[] args)
{
TimeSpan tsJobMin = new TimeSpan(0, 0, 1),
tsJobMax = new TimeSpan(0, 0, 5);
TimeSpan tsTimerMin = new TimeSpan(0, 0, 2),
tsTimerMax = new TimeSpan(0, 0, 4);
using (JobConsumer jobc = new JobConsumer())
{
bool fDone = false,
fPaused = false;
while (!fDone)
{
ConsoleKeyInfo cki = Console.ReadKey(true);
switch (cki.KeyChar)
{
case ' ':
jobc.Add(new TestJob(tsJobMin, tsJobMax));
break;
case 'P':
case 'p':
if (fPaused)
{
jobc.Resume();
}
else
{
jobc.Pause();
}
fPaused = !fPaused;
break;
case 'X':
case 'x':
fDone = true;
break;
case 'T':
case 't':
{
TimeSpan tsInterval = StaticRandom.Next(tsTimerMin, tsTimerMax);
jobc.Add(new TestJob(new TimeSpan(tsInterval.Ticks / 10)), tsInterval);
}
break;
case 'C':
case 'c':
jobc.ClearTimerJobs();
break;
default:
Console.WriteLine("(unknown key input)");
break;
}
}
}
Console.WriteLine("Done processing jobs");
Console.ReadLine();
}
}
static class StaticRandom
{
static Random _rnd = new Random();
public static int Next(int iMax)
{
return _rnd.Next(iMax);
}
public static TimeSpan Next(TimeSpan tsMin, TimeSpan tsMax)
{
return tsMin + new TimeSpan(0, 0, 0, 0, Next((int)(tsMax - tsMin).TotalMilliseconds));
}
}
interface IJob
{
void Execute();
}
class TestJob : IJob
{
private TimeSpan _tsExecute;
public TestJob(TimeSpan tsExecute)
{
_tsExecute = tsExecute;
}
public TestJob(TimeSpan tsMin, TimeSpan tsMax) :
this(StaticRandom.Next(tsMin, tsMax))
{
}
#region IJob Members
void IJob.Execute()
{
Thread.Sleep(_tsExecute);
Console.WriteLine("Job executed");
}
#endregion
}
class JobConsumer : IDisposable
{
private Queue<IJob> _qjob = new Queue<IJob>();
private List<TimerJob> _ltjob = new List<TimerJob>();
private object _objLock = new object();
private ConsumerState _qs = ConsumerState.Running;
/// <summary>
/// Encapsulates the logic to manage an individual timer-based
/// job. Supports pausing in the middle of an interval; on
/// resume, delays only for as long is needed to complete the
/// interval that was interrupted.
///
/// This class is _not_ thread-safe, even though it has some
/// synchronization embedded. Client code must synchronize
/// access to instances of this class to avoid unpredictable
/// results.
/// </summary>
private class TimerJob : IDisposable
{
private TimeSpan _tsInterval;
private JobConsumer _jobc;
private IJob _job;
private TimeSpan _tsPaused;
private DateTime _dtLastElapsed = DateTime.Now;
private bool _fPaused;
private object _objLock = new object();
private Timer _timer;
/// <summary>
/// Constructor
/// </summary>
/// <param name="tsInterval">Repeating time interval. Each time it elapses, the given job will be added to the consumer's queue.</param>
/// <param name="jobc">The job consumer to use</param>
/// <param name="job">The job to add to the consumer's queue when the interval elapses</param>
public TimerJob(TimeSpan tsInterval, JobConsumer jobc, IJob job)
{
_tsInterval = tsInterval;
_jobc = jobc;
_job = job;
_timer = new Timer(_Elapsed);
_RestartTimer();
}
/// <summary>
/// Set to true to pause the timer, false to resume.
/// </summary>
public bool Paused
{
get { return _fPaused; }
set
{
if (_fPaused != value)
{
if (value)
{
_SuspendTimer();
}
else
{
_RestartTimer();
}
_fPaused = value;
}
}
}
private void _SuspendTimer()
{
// Calculate how much of the interval we've used already
lock (_objLock)
{
_tsPaused = DateTime.Now - _dtLastElapsed;
}
// If it's been longer than the interval, count it as
// exactly the same length so that when we resume, the
// due time is not negative.
if (_tsPaused > _tsInterval)
{
_tsPaused = _tsInterval;
}
_timer.Change(Timeout.Infinite, Timeout.Infinite);
}
private void _RestartTimer()
{
_timer.Change(_tsInterval - _tsPaused, _tsInterval);
}
private void _Elapsed(object objState)
{
Console.WriteLine("(timer elapsed)");
// WARNING! Note that we have two locks in play here.
// The TimerJob class uses one to protect the _dtLastElapsed
// field, and the JobConsumer has its own. Because the JobConsumer
// class may call into this class while it's holding its own
// lock, it is _vitally_ important that this class _never_ call
// the JobConsumer class while holding it's own lock.
//
// There's a very good reason the call to Add() is outside the lock
// here. Don't change that.
lock (_objLock)
{
_dtLastElapsed = DateTime.Now;
}
_jobc.Add(_job);
}
#region IDisposable Members
public void Dispose()
{
_timer.Dispose();
}
#endregion
}
private enum ConsumerState
{
Running, Paused, Exit
}
public JobConsumer()
{
new Thread(_Thread).Start();
}
private void _Thread()
{
Console.WriteLine("Consumer thread started");
while (true)
{
IJob job = null;
// NOTE: this is a simple implementation, with a potential
// performance issue. Specifically, for best performance
// the code would not actually release the lock after waiting
// if the queue had a non-zero count and the state is "Running".
// For many applications, this won't matter and to address that
// issue would complicate the code. So in the interest of
// simplicitly, I've left that optimization out.
lock (_objLock)
{
if (_qs == ConsumerState.Exit)
{
break;
}
if (_qs == ConsumerState.Running && _qjob.Count > 0)
{
Console.WriteLine("{0} jobs left", _qjob.Count);
job = _qjob.Dequeue();
}
else
{
Monitor.Wait(_objLock);
}
}
if (job != null)
{
job.Execute();
}
}
ClearTimerJobs();
Console.WriteLine("Consumer thread ended");
}
public void Add(IJob job)
{
lock (_objLock)
{
_qjob.Enqueue(job);
Monitor.Pulse(_objLock);
}
Console.WriteLine("Job added");
}
public void Add(IJob job, TimeSpan tsInterval)
{
TimerJob tjob = new TimerJob(tsInterval, this, job);
lock (_objLock)
{
_ltjob.Add(tjob);
}
Console.WriteLine("Timer job added");
}
public void Pause()
{
lock (_objLock)
{
if (_qs != ConsumerState.Paused)
{
_qs = ConsumerState.Paused;
foreach (TimerJob tjob in _ltjob)
{
tjob.Paused = true;
}
}
}
Console.WriteLine("Pausing");
}
public void Resume()
{
lock (_objLock)
{
if (_qs != ConsumerState.Running)
{
_qs = ConsumerState.Running;
foreach (TimerJob tjob in _ltjob)
{
tjob.Paused = false;
}
Monitor.Pulse(_objLock);
}
}
Console.WriteLine("Resuming");
}
public void ClearTimerJobs()
{
lock (_objLock)
{
while (_ltjob.Count > 0)
{
_ltjob[_ltjob.Count - 1].Dispose();
_ltjob.RemoveAt(_ltjob.Count - 1);
}
}
Console.WriteLine("Cleared timer jobs");
}
#region IDisposable Members
// NOTE: Strictly speaking, Dispose() is used a little inconsistently
// with the usual .NET model. For one, it can block on the lock, and
// for another we're not really using it to release unmanaged resources.
// The one motivation for using IDisposable instead of just having some
// non-standard "Quit", "Exit", "Stop", etc. method is so that we can
// use our object in a "using" statement.
// NOTE: in addition to the above, note that in spite of implementing
// Dispose(), there's not really any point in implementing a finalizer.
// The thread entry point is an instance method, which means this class
// can never wind up collectable until the thread has been explicitly
// stopped by the client. Thus, a finalizer would never actually be
// called, even if the client got rid of its last reference to the
// instance without stopping the thread (via Dispose() in this case).
public void Dispose()
{
lock (_objLock)
{
_qs = ConsumerState.Exit;
Monitor.Pulse(_objLock);
}
}
#endregion
}
}
.
- References:
- thread-safety
- From: RedLars
- Re: thread-safety
- From: Peter Duniho
- Re: thread-safety
- From: RedLars
- Re: thread-safety
- From: Peter Duniho
- Re: thread-safety
- From: RedLars
- Re: thread-safety
- From: Peter Duniho
- Re: thread-safety
- From: RedLars
- Re: thread-safety
- From: Peter Duniho
- Re: thread-safety
- From: RedLars
- Re: thread-safety
- From: Peter Duniho
- Re: thread-safety
- From: RedLars
- Re: thread-safety
- From: Peter Duniho
- thread-safety
- Prev by Date: Re: How to create dynamically event _DataReceived
- Next by Date: Socket problem
- Previous by thread: Re: thread-safety
- Next by thread: Re: thread-safety
- Index(es):
Relevant Pages
|
Loading