老板上编写一个数据库同步模块,来同步两个数据库中的30个数据表,数据表的更新通过一个类来实现,每个表的数据量不同,如果用单线程需要等很长时间;老板的要求是通过n个线程(比如5个)同时工作,分别同步不同的数据表。查询了很多资料,搞了两个礼拜也没有搞明白,我的基本思路是这样的:获取数据表清单TableListint ThreadCount=0;foreach table in TableList
  if (ThreadCount>5)
   {
     Wait();
   }
   else
   {
   添加一新的进程来处理:Update(table);
   }nextvoid update(string talbe){
  处理前:ThreadCount++;
  
  处理过程……  处理完成后:TreadCount--;
}俺也是刚刚开多线程,不知道是思路问题还是什么原因,一直没有查到可用解决办法!请各路大侠指条明路,万分感谢!

解决方案 »

  1.   

    这么快就有回复了,感动中!功能目标:通过多个线程(可指定)完成一个包含多个相同任务的任务集。
    我已经试用了Threadpool,但其中的waithandle类搞不明白,用不好目前的问题:
    1、如何记录线程数的变化,用于将线程数控制在指定的范围(Interlocked似乎是最佳方案?);
    2、如何实现当线程达到上限后的等待功能,即如何某一个线程已经结束,并开始一个新的线程。(Waitone或WaitAny似乎可以解决?两者区别在哪里?)
    3、到最后阶段,如何知道所有线程都已经结束了(WaitAll如何使用?)
    4、测试时,偶尔出现DataReader冲突,是否需要在相应的部分加代码锁?
      

  2.   

    1)可以使用线程池指定最大线程数.
    2)Waitone当在派生类中重写时,阻止当前线程,直到当前的 WaitHandle 收到信号,而WaitAny 等待指定数组中的任一元素收到信号
    3)http://msdn.microsoft.com/zh-cn/library/z6w25xa6(VS.80).aspx
    4)需要,定义一信号量Mutex m = new Mutex();
      

  3.   

    谢谢!yilunduyue说的应该很对,可是小弟仍不太明白具体如何操作,能否说得更明白些,最好说整个思路描述一下!
      

  4.   

    我对楼主的sql同步很感兴趣,是A同步到B,还是双向的?具体是怎么个思路?看到用datareader ,是否是从A读出来然后去B比较,并更新?
      

  5.   

    哈哈,关于数据同步真是一言难尽呀,我最终是将所有的操作记录下来,然后根据操作记录进行双向的同步(Ms SQL的replication似乎也是这样模式,因为我的数据库没有固定IP和管理员权限,同时数据量不太大,所以就DIY了一个),相关的操作记录是在存储过程中实现的。
      

  6.   

    我来给你讲下线程思路
    首先准备好你的线程池(可以事系统线程池,也可以是你自己的线程数组)比如准备5线程。
    然后准备你的任务列表,比如你要同步38个表。这里有两种办法,分布到线程组里边去。
    第一,先分组,38除以5,得到7余三,补整后得到8.就是我们把任务分成5组,每组8个(实际上有三个线程得到了7个,少一个就少一个吧,只要你吧数学搞对别出错就可以);
    然后把这些个任务组分别指派给你那些线程的实际方法(怎么传参数你应该会吧) 然后启动他们。
    每个线程完成后触发一个事件,高速主程序,都完成后,任务结束。
    这个方法,效率不是很高,数据量的不同,后期会有些不均衡。
    第二个办法是任务流水线,线程循环用。(这个最好用线程池,用自定义线程就会浪费些效率)
    把38个任务直接排队成一个流水线队列。然后启动线程分配循环(这本身就是个线程,也可以就是主线程)取到一个闲置线程,就分发给他一个任务去执行。继续分配下一个。直到任务分配完。 循环中,如果取不到空闲线程(已经达到5个了)就等待.(线程数的控制你可以自己限定一个值,通过计数器来控制,也可以直接吧线程池的大小设计成一个值,比如7~8,程序本身的线程数加你的主控线程加任务线程肯定不是5,而是至少7以上)
    队列分配完成,并且你的线程都返回以后就可以结束程序了。
    (小建议,再给你的38个表安排队列的时候,如果可以事先得出它的任务量,就是你要处理的条数和操作次数,建议按照工作量倒序排队。这样整个执行期间的效率会比较高)
    另外,楼主的同步工作我比较感兴趣,是双向的么?是通过全部数据比对(太笨的办法了)?还是通过什么特殊技巧?
    我曾经见过一个公司的产品,同时差异同步20个数据库(每库大约3张表,每表每天更新和新加大约5000记录,每个记录都有IMAGE字段和8~10个字符串字段)大约十秒多就完成了。(A服务器上的20个库,同步到B服务器上的2个库中。10比1的汇总同步。)  到现在也不明白怎么跟踪的数据变化。
      

  7.   

    一言难尽呀!顶顶楼上.
    --------------
    我来给你讲下线程思路 
    首先准备好你的线程池(可以事系统线程池,也可以是你自己的线程数组)比如准备5线程。 
    然后准备你的任务列表,比如你要同步38个表。这里有两种办法,分布到线程组里边去。 
    第一,先分组,38除以5,得到7余三,补整后得到8.就是我们把任务分成5组,每组8个(实际上有三个线程得到了7个,少一个就少一个吧,只要你吧数学搞对别出错就可以); 
    然后把这些个任务组分别指派给你那些线程的实际方法(怎么传参数你应该会吧) 然后启动他们。 
    每个线程完成后触发一个事件,高速主程序,都完成后,任务结束。 
    这个方法,效率不是很高,数据量的不同,后期会有些不均衡。 
    第二个办法是任务流水线,线程循环用。(这个最好用线程池,用自定义线程就会浪费些效率) 
    把38个任务直接排队成一个流水线队列。然后启动线程分配循环(这本身就是个线程,也可以就是主线程)取到一个闲置线程,就分发给他一个任务去执行。继续分配下一个。直到任务分配完。 循环中,如果取不到空闲线程(已经达到5个了)就等待.(线程数的控制你可以自己限定一个值,通过计数器来控制,也可以直接吧线程池的大小设计成一个值,比如7~8,程序本身的线程数加你的主控线程加任务线程肯定不是5,而是至少7以上) 
    队列分配完成,并且你的线程都返回以后就可以结束程序了。 
    (小建议,再给你的38个表安排队列的时候,如果可以事先得出它的任务量,就是你要处理的条数和操作次数,建议按照工作量倒序排队。这样整个执行期间的效率会比较高) 
    另外,楼主的同步工作我比较感兴趣,是双向的么?是通过全部数据比对(太笨的办法了)?还是通过什么特殊技巧? 
    我曾经见过一个公司的产品,同时差异同步20个数据库(每库大约3张表,每表每天更新和新加大约5000记录,每个记录都有IMAGE字段和8~10个字符串字段)大约十秒多就完成了。(A服务器上的20个库,同步到B服务器上的2个库中。10比1的汇总同步。)  到现在也不明白怎么跟踪的数据变化。 
      

  8.   

    程序的主干(控制部分)是类似这样的:
    using System;
    using System.Collections.Generic;
    using System.Threading;namespace ConsoleApplication1
    {
        class Program
        {
            static void Main(string[] args)
            {
                List<string> tableNames = 访问你的数据库取得要同步更新的表的名字集合()
                List<Thread> threads = new List<Thread>();
                foreach (string tn in tableNames)
                {
                    Task k = new Task();
                    k.TableName = tn;
                    Thread t = new Thread(new ThreadStart(k.同步));
                    threads.Add(t);
                    t.Start();
                }
                foreach (Thread t in threads)
                    t.Join();
            }        class Task
            {
                public string TableName;            public void 同步()
                {
                    Console.WriteLine(string.Format("开始同步表[{0}]", this.TableName));
                    //todo : 同步表
                    Console.WriteLine(string.Format("同步表[{0}]................完成", this.TableName));
                }
            }
        }
    }
      

  9.   

    大家这么热情,谢谢!谢谢!
    不过说到这里似乎有点乱,我来理理!我更接受fengyecsdn 的第二个方案,用5个线程循环做业,处理不同的表:数量的小的,很快结束后释放进程去处理下一个表,而数据量大的表则工作线程持续工作。目前面临的问题一方面是如何控制和分配线程,同时又发现了一个新问题,就是数据库的瓶颈,目前总是报错说,一个相关的DataReader正在运行,请先关闭它。到底ado.net是不是支持多线程、多通道呢?
      

  10.   

    如果你需要使用线程池,你可以写:
    using System;
    using System.Collections.Generic;
    using System.Threading;namespace ConsoleApplication1
    {
        class Program
        {
            static void Main(string[] args)
            {
                List<string> tableNames = new List<string>();  //todo:访问你的数据库取得要同步更新的表的名字集合()
                List<Task> tasks = new List<Task>();
                foreach (string tn in tableNames)
                {
                    Task k = new Task();
                    k.TableName = tn;
                    tasks.Add(k);
                    ThreadPool.QueueUserWorkItem(new WaitCallback(k.同步));
                }
                while (true)
                {
                    bool flag = true;//如果发现有一个还没有结束的,返回false。
                    foreach (Task t in tasks)
                        if (!t.是否结束)
                        {
                            flag = false;
                            break;
                        }
                    if (flag)
                        break;                Thread.Sleep(2000);
                }
            }        class Task
            {
                public string TableName;            public bool 是否结束 = false;            public void 同步(object obj)
                {
                    Console.WriteLine(string.Format("开始同步表[{0}]", this.TableName));
                    //todo : 同步表
                    Console.WriteLine(string.Format("同步表[{0}]................完成", this.TableName));
                    this.是否结束 = true;
                }
            }
        }
    }
      

  11.   

    [Quote=引用 17 楼 Andy_shi 的回复:]
    目前面临的问题一方面是如何控制和分配线程,同时又发现了一个新问题,就是数据库的瓶颈,目前总是报错说,一个相关的DataReader正在运行,请先关闭它。到底ado.net是不是支持多线程、多通道呢?数据库连接当然应该在单个线程中打开和关闭了,不能共享。
      

  12.   

    对于windows操作系统来说,几百个线程也一点不慢。而对于大型数据库,几百个客户端也不慢(何况你还是各个表分开)。5个线程,对于使用windows操作系统有点抠门了,15年以前的操作系统教材上常会那样说。你毕竟不是做一个bt软件让全世界的用户去访问你的服务器,你只有一个客户端专门干这个事,所以即使开200个线程也不算很多。真正可能慢的是你比较记录的方法,不同的比较方法,可能有“5个小时和5分钟之间”的时间差别。
      

  13.   


    首先谢谢sp1234!我想确认的是,Ado.net是否支持不同的Connection,同时执行相关的fill操作或ExcuteReader操作,或者说,按上边的方面实施,出现了“一个相关的DataReader正在运行,请先关闭它”错误提示可能是哪里出问题了?另外,检测进程是否结束,可以通过waithandle的回调来实现(这是我刚学的)。
      

  14.   

    我现在处理多线程的时候,基本上和fengyecsdn 的第二个方案一样,会有一个动态获取任务的线程,还有一堆执行任务的工作线程,很像sokcet编程一样,一个监听,一堆工作。只是我在想同步数据库表记录的话,就相当于必须把所有任务写成同一个类,不考虑数据表结构或执行步骤的差异,那如果这样是不是可以用存储过程来执行呢?毕竟用datareader读,还是和表结构有关系的。所以可以写个程序和 计划任务 一样,工作线程里只执行存储过程,往程序里传值的时候也就只要传个存储过程名就行了
      

  15.   

    也可以直接利用委托        private static int count;        static void Main(string[] args)
            {
                Queue<Action<object>> tasks = new Queue<Action<object>>();
                // init tasks
                for (int i = 0; i < 20; i++)
                {
                    int j = i;
                    tasks.Enqueue(delegate
                    {
                        Console.WriteLine("start task {0}", j);
                        for (int m = 0; m < 100 * (j % 4 + 6); m++)
                            Thread.Sleep(1);
                        Console.WriteLine("end task {0}", j);
                    });
                }
                count = 0;
                while (tasks.Count > 0)
                {
                    if (count < 5)
                    {
                        Action<object> task = tasks.Dequeue();
                        Interlocked.Increment(ref count);
                        task.BeginInvoke(null, delegate(IAsyncResult ar)
                        {
                            ((Action<object>)ar.AsyncState).EndInvoke(ar);
                            Interlocked.Decrement(ref count);
                        }, task);
                    }
                    else
                    {
                        Thread.Sleep(1);
                    }
                }
                while (count > 0)
                    Thread.Sleep(1);
                Console.WriteLine("Completed.");
                Console.ReadLine();
            }
      

  16.   

    同意sp1234说的,可以自己写Thread组里处理,或者直接使用ThreadPool.QueueUserWorkItem线程池来进行操作
      

  17.   

    public void SyncMain(string[] tableName)
    {
        ManualResetEvent _mainResetEvent = new ManualResetEvent(false);
        _mainThread = new Thread(new ParameterizedThreadStart(SyncDataBase));
        _mainThread.SetApartmentState(ApartmentState.MTA);
        _mainThread.Start(new object[] { tableName, mainResetEvent});
        //等待主线程更新完成
        _mainResetEvent.WaitOne();
    }
    private void SyncMain(object args)
    {
        object[] _args = (object[])args;
        string[] tablenames = (string[])_args[0];
        ManualResetEvent _mainResetEvent = (ManualResetEvent)_args[1];
        //启动多线程同步数据库,每张表一个线程
        List<ManualResetEvent> _waitHandles =  new List<ManualResetEvent>();
        foreach(string tblname in tablenames)
        {
            ManualResetEvent _oneHandle = new ManualResetEvent(false);
            _waitHandles.Add(_oneHandle);
            SyncState _state = new SyncState(tblname,_oneHandle);
            ThreadPool.QueueUserWorkItem(new WaitCallback(SyncTable), _state);        
        }
        //等待所有线程均返回成功消息
        WaitHandle.WaitAll(_waitHandles.ToArray());
        //响应主线程
        mainResetEvent.Set();
    }private void SyncTable(object state)
    {
        SyncState _state = (SyncState)state;
        //开始同步;
        //Sync code in here;
        //同步完成
        _state.ResetEvent.Set();
    }internal class SyncState
    {
        public string TableName;
        public ManualResetEvent ResetEvent;    public DownStateInfo(string tablename, ManualResetEvent resetEvent)
        {
            this.ChunkSize = chunksize;
            this.ResetEvent = resetEvent;
        }
    }
      

  18.   

    楼主的服务器是什么呢?
    PCSERVER? 刀片? 小型机?
    我们的双四核至强刀片机之间汇总数据开50我觉得就很不错了。因为网络存储设备的响应能力有限。  如果是本地磁盘阵列,100也许更好吧
    真开500我觉得反而让磁盘控制很慢了。至于DATAREADER 我不建议用在这种并发大量数据密集的操作中,当然也许是我的习惯和风格不合适吧。  我觉得还是数据块的方式使用,从A直接取出一个段(可以是一个DATATABLE)再内存中作需要的处理,然后一次写到B上, 然后再去A取,
    如果能优化算法,保证不会出现漏操作。甚至可以双线程协同。一个不停的读,一个不停的写。哈哈。(不过在上边多线程模式下就意义不大了.线程调度开销要控制好,不然是画蛇添足了。)
      

  19.   


    你不会认为数据库同步是在一台繁忙的服务器内部自己本地两个数据库之间进行的吧。那么这首先应该是技术管理者的问题,而不是编程问题。我的理解他应该是面向远程编程,或者每同步一个表需要很长时间(例如几分钟)。“网络存储设备”是指具体什么东西呢?线程的数据跟速度成正比么?我想楼主并没有直接对磁盘底层的I/O接口编程,而是对楼主所说的大型数据库编程!当它访问大型数据库,它作为客户端对服务器的请求将被排队(满足数据库自动管理的并发数限制),因此线程自己会被hang住的,因此实际上这个“”跟数据库并无关系。
    当我只是把几个G的字节使用磁盘驱动器的底层api写入,那么自然我会认为最好只并发2、3个线程就够了,因为磁盘就是如此。当我们只是读取本地数据库,在内存中进行处理,然后写到本地数据库,那么并发数量可以保持中等。但是如果我们是使用SQL,并且是远程的同步,那么同步线程可以比中等的多好几倍也不拥挤。
      

  20.   

    最后,很难想象一个有经验的技术管理者会拿 业务Portal服务器 做这种“突发巨量”的同步任务。这个任务应该放在几乎专用的备份服务器上,并且选择整个网络“沉睡”的时候。因此如果考虑服务器的性能,应该考虑一个比较干净的运行环境,而不能把它放在一个繁忙拥挤(有很多重要服务在运行)的环境中考虑。
      

  21.   


    当然不是一台机器了。
    举例来说,20台刀片机,是普通服务器,用NAS或ISCSI
    3台汇总数据库也是刀片机但是挂的是高速NAS。 以及两台小型机跑更大的数据库。
    有域有子网,也有四层交换等等网络环境。我们这里的实际测试结果是,数据库如果放在NAS和ISCSI上,那么汇总工作开100线程比开200要快,放在RIAD本地磁盘上200线要快。而且我们的同步工作是不中断服务的,也就是说两边的服务器仍然在工作,有几十个工作请求仍在处理。
      

  22.   

    如果是单独的数据库处理数字类型的数据,那基本上可以说吧服务器逼到极限也没什么大不了的。
    但是在实际使用情况下,我们要同步的字段很多是IMAGE等类型,一条记录可能是2K~500K,平均一次同步就有有约3G的数据。内存再大也会吧压力转移到硬盘操作上。而NAS和ISCSI的响应时间比本地RIAD就是慢很多很多的。
      

  23.   

    可以参考一下这里,楼主可能有答案,
    http://blog.csdn.net/zhzuo/archive/2004/07/08/37262.aspx
    http://blog.csdn.net/zhzuo/archive/2004/06/10/22037.aspx
    http://blog.csdn.net/zhzuo/archive/2008/07/23/2699305.aspx
    http://blog.csdn.net/zhzuo/archive/2008/07/23/2699847.aspx