我有一个集合,比如string[] persons = {"person1","person2","person3"};
我希望是这样来使用:
一个"person"一次只能处理一个业务,当多个线程来进来时,第一个线程用了"person1",其他线程不能再使用它,那么第二个线程进来时它只能去用"person2",依次类推。当所有的"person"都被使用时,后进来的线程等待某一个"person"被释放后再去使用。我曾希望Lock能做到这点,但发现一旦lock掉,那么只有一个线程在执行,其他所有的线程都在等待,这样永远都只会用到"person1",其他的"person"都不会被用到,这样太耗时了!我也想过用ThreadPool, static int index = 0;
static string[] persons = {"person1","person2","person3"}; static void Main(string[] args)
{
for (int threadIndex = 0; threadIndex < 10; threadIndex++)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(GetPos));
}
} static void GetPos()
{
//获取person
string usedPerson = persons[index];
//doing sth
...
index++;
}
但这也有问题,因为一个"person"只能同时处理一项业务,所以当所有的person都被使用时,我需要等待某一个被释放,但我好像没法判断哪个"person"被用过释放掉了。
各位老大,有啥好方法啊,我挺菜的,如果可能,给点代码哈!
我希望是这样来使用:
一个"person"一次只能处理一个业务,当多个线程来进来时,第一个线程用了"person1",其他线程不能再使用它,那么第二个线程进来时它只能去用"person2",依次类推。当所有的"person"都被使用时,后进来的线程等待某一个"person"被释放后再去使用。我曾希望Lock能做到这点,但发现一旦lock掉,那么只有一个线程在执行,其他所有的线程都在等待,这样永远都只会用到"person1",其他的"person"都不会被用到,这样太耗时了!我也想过用ThreadPool, static int index = 0;
static string[] persons = {"person1","person2","person3"}; static void Main(string[] args)
{
for (int threadIndex = 0; threadIndex < 10; threadIndex++)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(GetPos));
}
} static void GetPos()
{
//获取person
string usedPerson = persons[index];
//doing sth
...
index++;
}
但这也有问题,因为一个"person"只能同时处理一项业务,所以当所有的person都被使用时,我需要等待某一个被释放,但我好像没法判断哪个"person"被用过释放掉了。
各位老大,有啥好方法啊,我挺菜的,如果可能,给点代码哈!
http://blog.csdn.net/zhoufoxcn/archive/2008/05/17/2453803.aspx
信号量存储了空闲的指针;当某一person被使用时,则从信号量中移除此指针,等任务完成后,再交还此指针,这样就可以能过信号量来取得空闲的位置了;
还有一种方法,存储Person类的数组,每个Person实例都有一个状态标识其是否在工作;再用一信号量存储空闲的个数,当信号量不为零时则遍历Person数组找到空闲的Person实例即可,否则让线程一直处于等待状态直到有线程释放了资源
信号量存储了空闲的指针;当某一person被使用时,则从信号量中移除此指针,等任务完成后,再交还此指针,这样就可以能过信号量来取得空闲的位置了;
还有一种方法,存储Person类的数组,每个Person实例都有一个状态标识其是否在工作;再用一信号量存储空闲的个数,当信号量不为零时则遍历Person数组找到空闲的Person实例即可,否则让线程一直处于等待状态直到有线程释放了资源
Queue<Person> personQueue; void StartProcessing()
{
Person[] persons = ...; personQueue = new Queue<Person>(persons);
for (int i = 0; i < 8; i++)
{
ThreadPool.QueueUserWorkItem( ProcessPerson );
}
} void ProcessPerson(object state)
{
while (true)
{
Person person = null;
lock (this.personQueue)
{
if (this.personQueue.Count > 0) person = personQueue.Dequeue();
else return;
} person.DoSomething();
}
}
Semaphore pool; void StartProcessing()
{
Person[] persons = ...; personQueue = Queue.Synchronized(new Queue<Person>(persons));
pool = new Semaphore(personQueue.Count, personQueue.Count);
for (int i = 0; i < 8; i++)
{
ThreadPool.QueueUserWorkItem( ProcessPerson );
}
} void ProcessPerson(object state)
{
pool.WaitOne();
person = personQueue.Dequeue();
person.DoSomething();
personQueue.Enqueue(person);
pool.Release();
}
C# .net 多线程中集合数据同步(转)
集合类通常不是线程安全的,多个阅读器可以安全的读取集合.但是对集合的任何修改都将为访问集合的所有线程生成不明确的结果.使用以下任何方法都可以令集合类是线程安全的
(1) 使用Synchronized 方法,则从该类派生包装,并通过该包装以独占方式访问集合
(2) 如果该类没有Synchronized 方法,则从该类派生并使用SyncRoot属性实现Synchronized 方法.
(3) 在访问该集合时对SyncRoot属性使用锁定机制
这一段时间在公司做多线程的东西比较多,所以把一些心得写了下来,对关注这一块的朋友有个提示作用.
大家可以看看以下代码:
class Program
{
static void Main(string[] args)
{
Program pg = new Program();
//写线程
Thread t1 = new System.Threading.Thread(new ThreadStart(pg.t1fun));
// 读线程
Thread t2 = new System.Threading.Thread(new ThreadStart(pg.t2fun));
//删线程
Thread t3 = new System.Threading.Thread(new ThreadStart(pg.t3fun));
t1.Start();
t2.Start();
t3.Start();
}
ArrayList arraylist = new ArrayList();
public void t1fun()
{
while (true)
{
arraylist.Add("t1--写入");
System.Console.Out.WriteLine("写入");
System.Threading.Thread.Sleep(1000);
}
}
public void t2fun()
{
while (true)
{
for (int i = arraylist.Count - 1; i >= 0; i--)
{
System.Console.Out.WriteLine("t2读取:"+(string)arraylist[i]);
}
System.Threading.Thread.Sleep(1000);
}
}
public void t3fun()
{
while (true)
{
for (int i = arraylist.Count - 1; i >= 0; i--)
{
arraylist.RemoveAt(i);
System.Console.Out.WriteLine("t3删除:t1"+i.ToString());
}
System.Threading.Thread.Sleep(1000);
}
}
}
这个测试程序得简单,大家一看就明白了你可以运行一下看看,程序一会就挂了,揭示异常:
未处理的异常: System.ArgumentOutOfRangeException: 索引超出范围。必须为非负值并
小于集合大小。
这就是因为多线程中对共享的集合资源同步引起的
下面是改后的代码:
class Program
{
static void Main(string[] args)
{
Program pg = new Program();
//写线程
Thread t1 = new System.Threading.Thread(new ThreadStart(pg.t1fun));
// 读线程
Thread t2 = new System.Threading.Thread(new ThreadStart(pg.t2fun));
//删线程
Thread t3 = new System.Threading.Thread(new ThreadStart(pg.t3fun));
t1.Start();
t2.Start();
t3.Start();
}
ArrayList arraylist = new ArrayList();
public void t1fun()
{
while (true)
{
lock (arraylist.SyncRoot)
{
arraylist.Add("t1--写入");
}
System.Console.Out.WriteLine("写入");
System.Threading.Thread.Sleep(1000);
}
}
public void t2fun()
{
while (true)
{
lock (arraylist.SyncRoot)
{
for (int i = arraylist.Count - 1; i >= 0; i--)
{
System.Console.Out.WriteLine("t2读取:" + (string)arraylist[i]);
}
}
System.Threading.Thread.Sleep(1000);
}
}
public void t3fun()
{
while (true)
{
lock (arraylist.SyncRoot)
{
for (int i = arraylist.Count - 1; i >= 0; i--)
{
arraylist.RemoveAt(i);
System.Console.Out.WriteLine("t3删除:t1" + i.ToString());
}
}
System.Threading.Thread.Sleep(1000);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;namespace SemaphoreDemo
{
public class ThreadClass//声明一个类
{
public IList<PersonWorker> PersonList { get; set; }//传入共享资源
private Semaphore se = null;//信号量 public ThreadClass(Semaphore Semaphore)//创建类实例
{
se = Semaphore;
} public void Work()//让类开始工作
{
try
{
Console.WriteLine(Name + " Wait Semaphore " + DateTime.Now.ToString("HH:mm:ss"));
se.WaitOne();//等待信号量
Console.WriteLine(Name + " Got Semaphore " + DateTime.Now.ToString("HH:mm:ss")); PersonWorker Worker = null;//准备获取工作者
lock (PersonList)//锁定共享资源,若到这一步,则说明一定存在共享资源,否则会由信号量一直等待直到有线程释放了共享资源
{
foreach (PersonWorker pw in PersonList)
{
if (!pw.IsWorking)//
{
Worker = pw;//取得空闲的共享资源
break;
}
}
}
Worker.Run();//开始工作; }
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
Console.WriteLine(Name + " Release Semaphore " + DateTime.Now.ToString("HH:mm:ss"));
Console.WriteLine("*************************");
Console.WriteLine(""); se.Release();//释放信号量
}
} public string Name { get; set; }
} public class PersonWorker
{
private bool m_IsWorking = false;
public bool IsWorking { get { return m_IsWorking; } }
public string Name { get; set; } static Random r = null;
static PersonWorker()
{
r = new Random();
} public void Run()
{
int l = r.Next(10, 10000);//随机生成工作的时间,
Console.WriteLine("Thread=" + Name + " Time=" + l.ToString());
m_IsWorking = true;
Thread.Sleep(l);//模拟工作的时间,
m_IsWorking = false;//工作完成,
} }
}下面是主程序:using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.IO;namespace SemaphoreDemo
{
class Program
{
private static Semaphore se;//信号量
private static List<PersonWorker> PersonList;//共享资源
private static int i = 0;//创建的线程个数 private static TextWriter tw = null;//把结果写入到当前目录的文本文件,以便于分析程序是否正确 static void Main(string[] args)
{
PersonList = new List<PersonWorker>() {
new PersonWorker(){Name="P0"},
new PersonWorker(){Name="P1"},
new PersonWorker(){Name="P2"}
};//创建共享资源 se = new Semaphore(PersonList.Count, PersonList.Count);//创建信号量
tw = new StreamWriter("log" + DateTime.Now.ToString("ddHHmmss") + ".txt");
Console.SetOut(tw);// Timer t = new Timer(ThreadCreator);//创建一定时器
t.Change(1000, 1000);//1秒触发一次
Console.ReadKey();
t.Dispose();
tw.Close();
} private static void ThreadCreator(object o)
{
Console.WriteLine("");
Console.WriteLine("Create Thread");
ThreadClass tc = new ThreadClass(se) { PersonList = PersonList, Name = "Thr" + i++.ToString() };//创建线程,传入信号量及共享资源 tc.Work();//让线程开始工作;
}
}
}
今天随便写了一段小程序,虽然不能用信号量存储信息,但可以借助Semaphor和lock()方法来完成,没有经过严格测试,如果有兴趣的话,希望大家帮忙测试一下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;namespace SemaphoreDemo
{
public class ThreadClass//声明一个类
{
public IList<PersonWorker> PersonList { get; set; }//传入共享资源
private Semaphore se = null;//信号量 public ThreadClass(Semaphore Semaphore)//创建类实例
{
se = Semaphore;
} public void Work()//让类开始工作
{
try
{
Console.WriteLine(Name + " Wait Semaphore " + DateTime.Now.ToString("HH:mm:ss"));
se.WaitOne();//等待信号量
Console.WriteLine(Name + " Got Semaphore " + DateTime.Now.ToString("HH:mm:ss")); PersonWorker Worker = null;//准备获取工作者
lock (PersonList)//锁定共享资源,若到这一步,则说明一定存在共享资源,否则会由信号量一直等待直到有线程释放了共享资源
{
foreach (PersonWorker pw in PersonList)
{
if (!pw.IsWorking)//
{
Worker = pw;//取得空闲的共享资源
break;
}
}
}
Worker.Run();//开始工作; }
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
Console.WriteLine(Name + " Release Semaphore " + DateTime.Now.ToString("HH:mm:ss"));
Console.WriteLine("*************************");
Console.WriteLine(""); se.Release();//释放信号量
}
} public string Name { get; set; }
} public class PersonWorker
{
private bool m_IsWorking = false;
public bool IsWorking { get { return m_IsWorking; } }
public string Name { get; set; } static Random r = null;
static PersonWorker()
{
r = new Random();
} public void Run()
{
int l = r.Next(10, 10000);//随机生成工作的时间,
Console.WriteLine("Thread=" + Name + " Time=" + l.ToString());
m_IsWorking = true;
Thread.Sleep(l);//模拟工作的时间,
m_IsWorking = false;//工作完成,
} }
}下面是主程序:using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.IO;namespace SemaphoreDemo
{
class Program
{
private static Semaphore se;//信号量
private static List<PersonWorker> PersonList;//共享资源
private static int i = 0;//创建的线程个数 private static TextWriter tw = null;//把结果写入到当前目录的文本文件,以便于分析程序是否正确 static void Main(string[] args)
{
PersonList = new List<PersonWorker>() {
new PersonWorker(){Name="P0"},
new PersonWorker(){Name="P1"},
new PersonWorker(){Name="P2"}
};//创建共享资源 se = new Semaphore(PersonList.Count, PersonList.Count);//创建信号量
tw = new StreamWriter("log" + DateTime.Now.ToString("ddHHmmss") + ".txt");
Console.SetOut(tw);// Timer t = new Timer(ThreadCreator);//创建一定时器
t.Change(1000, 1000);//1秒触发一次
Console.ReadKey();
t.Dispose();
tw.Close();
} private static void ThreadCreator(object o)
{
Console.WriteLine("");
Console.WriteLine("Create Thread");
ThreadClass tc = new ThreadClass(se) { PersonList = PersonList, Name = "Thr" + i++.ToString() };//创建线程,传入信号量及共享资源 tc.Work();//让线程开始工作;
}
}
}