Thread Th5 = new Thread(delegate()
{
DataDispose(al1);
DataBC(DtConut);
});
你将DataBC(DtCount)放入到线程的结尾,不就可以了吗?只要执行完了DataDispose,自然就会执行DataBC另外使用委托的BeginInvoke方法,可以添加回调函数,来判断是否执行完成~ //回调函数
void Finish(IAsyncResult ar)
{
if (ar.IsCompleted)
{
Console.WriteLine("finished");
//code here
}
}
{
DataDispose(al1);
DataBC(DtConut);
});
你将DataBC(DtCount)放入到线程的结尾,不就可以了吗?只要执行完了DataDispose,自然就会执行DataBC另外使用委托的BeginInvoke方法,可以添加回调函数,来判断是否执行完成~ //回调函数
void Finish(IAsyncResult ar)
{
if (ar.IsCompleted)
{
Console.WriteLine("finished");
//code here
}
}
解决方案 »
- 无法将类型为“System.Windows.Forms.DataGridViewTextBoxCell”的对象强制转换为类型“System.Windows.Fo
- 类是用继承还是引用实例化调用呢?
- 如何给TabControl设置背景色啊
- 通过一个窗口A的按钮能控制另一个窗口B文本的运行
- 请高手进来看看,如何将这段VC代码转换成C#代码
- 请问在有DotNetBar的界面上如何响应DotNetBar上TreeView中Node的点击事件?
- [求购]视频源代码。
- 如何打开autocad dwg文件,求面积
- graphicspath.transform(matrix)是怎么变换的?
- 有什么办法除掉ComboBox的拉下来的黑框框!!
- 这个错误怎么改
- C#调用COM组件传数组问题
第一次尝试多线程
我写下我的需求
希望你能拜拜我
从本地读出txt文件
(txt文件一千五百多个)
数据 8G 将近一千五百万条数据
数据要处理 而且比较麻烦
我之前是把处理的数据直接insert数据
但发现很慢
后来我把数据放DataTable里面
然后用SqlBulkCopy导入数据库
但是问题又来了 处理数据很慢
把所以的数据处理完然完然后放入DataTable里面
我测试了一点点 估计全部的话要3天
所以我想用多线程来处理数据然后放入DataTable中
我想创建五个线程 理论上要快五倍
但我对多线程之前没有接触过
所以代码写的一片路七八糟
希望你帮我改下
十分感谢
这个量没多大,我们每月例行处理一个IIS LOG,资料清洗,建立多维数据集,资料量在 2-3亿, 大约 2 小时(当然跟设计,写法以及硬件条件有关)主要还是你提及的 "数据要处理 而且比较麻烦" 这里才是关键耗时点.从你之前给出的代码中,没看到有处理的部分.
源码 public void CountFrom(string[] sArr)
{
try
{
string[] SFM = sArr[2].Split('.');
//时间
string Time = null;
if (SFM.Length == 2)
{
if (SFM[1].Length > 3)
{
SFM[1] = SFM[1].Substring(0, 3);
}
Time = SFM[0] + "." + SFM[1];
}
else
{
Time = SFM[0];
}
string[] times = null;
times = Time.Split(':');
if (times.Length == 3)
{ }
else if(times.Length==2)
{
Time = "00:" + times[0] + ":" + times[1];
}
Time = sArr[1] +" "+ Time;
//纬度
string[] WD = sArr[3].Split('=');
string WDu = WD[1];
//经度
string[] JD = sArr[4].Split('=');
string JDu = JD[1];
//强度
string[] QD = sArr[5].Split('=');
string QDu = QD[1];
//陡度
string[] DD = sArr[6].Split('=');
string DDu = DD[1];
//误差
string[] WC = sArr[7].Split('=');
string WCu = WC[1];
//定位方式
string[] DW = sArr[8].Split(':');
string DWu = DW[1];
//省
string[] Sheng = sArr[9].Split(':');
string ShengY = Sheng[1];
//市
string[] Shi = sArr[10].Split(':');
string ShiY = Shi[1];
//县
string[] Xian = sArr[11].Split(':');
string XianY = Xian[1]; //获取省邮编
string sqlSheng = string.Format("select top 1 zipcode.zip from provinces " +
"join cities on provinces.provinceid=cities.provinceid " +
"join areas on cities.cityid=areas.cityid " +
"join zipcode on areas.areaid=zipcode.areaid " +
"where province='{0}'", ShengY);
//获取市邮编
string sqlShi = string.Format("select top 1 zipcode.zip from provinces " +
"join cities on provinces.provinceid=cities.provinceid " +
"join areas on cities.cityid=areas.cityid " +
"join zipcode on areas.areaid=zipcode.areaid " +
"where province='{0}' and cities.city='{1}'", ShengY, ShiY);
//获取县邮编
string sqlXian = string.Format("select top 1 zipcode.zip from provinces " +
"join cities on provinces.provinceid=cities.provinceid " +
"join areas on cities.cityid=areas.cityid " +
"join zipcode on areas.areaid=zipcode.areaid " +
"where province='{0}' and cities.city='{1}' and areas.area='{2}'", ShengY, ShiY, XianY);
//判断是否是国外 国外邮编为空
if (!ShengY.Equals("国外"))
{
ShengY = ReturnZipcode(sqlSheng);
}
else
{
ShengY = "null";
}
//判断是否是国外 国外邮编为空
if (!ShiY.Equals("国外"))
{
ShiY = ReturnZipcode(sqlShi);
}
else
{
ShiY = "null";
}
//判断是否是国外 国外邮编为空
if (!XianY.Equals("国外"))
{
XianY = ReturnZipcode(sqlXian);
}
else
{
XianY = "null";
}
//导入数据库sql语句
//string sql = string.Format("INSERT INTO [CLRAS].[dbo].[T_LIGHTNING]([OCCURTIME],[JINDU],[WEIDU] ,[INTENSITY],[GRADIENT] ,[CHARGE]" +
// ",[ENERGY],[OFFSET],[LOCATEMODE],[OCCURNS] ,[TODAYID],[PROVINCE],[COUNTY],[CITY],[GRIDKEY])" +
// "VALUES('{0}',{1},{2},{3},{4},{5},{6},{7},{8},{9},{10},{11},{12},{13},{14})",
// Time, WDu, JDu, QDu, DDu, WCu, DWu, 0, 0, 0, 0, 0, ShengY == null ? "null" : ShengY, ShiY == null ? "null" : ShiY, XianY == null ? "null" : XianY); DtConut.Rows.Add(new object[] { Time, WDu, JDu, QDu, DDu, WCu, DWu, 0, 0, 0, 0, 0, ShengY == null ? "null" : ShengY, ShiY == null ? "null" : ShiY, XianY == null ? "null" : XianY});
//try
//{
// Database db = DatabaseFactory.CreateDatabase();
// SqlCommand cmd = new SqlCommand(sql);
// if (db.ExecuteNonQuery(cmd) > 0)
// {
// IndexCheng++;
// }
// cmd.Dispose();
//}
//catch (Exception ex)
//{ // indexConut++;
//} }
catch (Exception ex)
{
indexConut++;
}
}
源码 public void CountFrom(string[] sArr)
{
try
{
string[] SFM = sArr[2].Split('.');
//时间
string Time = null;
if (SFM.Length == 2)
{
if (SFM[1].Length > 3)
{
SFM[1] = SFM[1].Substring(0, 3);
}
Time = SFM[0] + "." + SFM[1];
}
else
{
Time = SFM[0];
}
string[] times = null;
times = Time.Split(':');
if (times.Length == 3)
{ }
else if(times.Length==2)
{
Time = "00:" + times[0] + ":" + times[1];
}
Time = sArr[1] +" "+ Time;
//纬度
string[] WD = sArr[3].Split('=');
string WDu = WD[1];
//经度
string[] JD = sArr[4].Split('=');
string JDu = JD[1];
//强度
string[] QD = sArr[5].Split('=');
string QDu = QD[1];
//陡度
string[] DD = sArr[6].Split('=');
string DDu = DD[1];
//误差
string[] WC = sArr[7].Split('=');
string WCu = WC[1];
//定位方式
string[] DW = sArr[8].Split(':');
string DWu = DW[1];
//省
string[] Sheng = sArr[9].Split(':');
string ShengY = Sheng[1];
//市
string[] Shi = sArr[10].Split(':');
string ShiY = Shi[1];
//县
string[] Xian = sArr[11].Split(':');
string XianY = Xian[1]; //获取省邮编
string sqlSheng = string.Format("select top 1 zipcode.zip from provinces " +
"join cities on provinces.provinceid=cities.provinceid " +
"join areas on cities.cityid=areas.cityid " +
"join zipcode on areas.areaid=zipcode.areaid " +
"where province='{0}'", ShengY);
//获取市邮编
string sqlShi = string.Format("select top 1 zipcode.zip from provinces " +
"join cities on provinces.provinceid=cities.provinceid " +
"join areas on cities.cityid=areas.cityid " +
"join zipcode on areas.areaid=zipcode.areaid " +
"where province='{0}' and cities.city='{1}'", ShengY, ShiY);
//获取县邮编
string sqlXian = string.Format("select top 1 zipcode.zip from provinces " +
"join cities on provinces.provinceid=cities.provinceid " +
"join areas on cities.cityid=areas.cityid " +
"join zipcode on areas.areaid=zipcode.areaid " +
"where province='{0}' and cities.city='{1}' and areas.area='{2}'", ShengY, ShiY, XianY);
//判断是否是国外 国外邮编为空
if (!ShengY.Equals("国外"))
{
ShengY = ReturnZipcode(sqlSheng);
}
else
{
ShengY = "null";
}
//判断是否是国外 国外邮编为空
if (!ShiY.Equals("国外"))
{
ShiY = ReturnZipcode(sqlShi);
}
else
{
ShiY = "null";
}
//判断是否是国外 国外邮编为空
if (!XianY.Equals("国外"))
{
XianY = ReturnZipcode(sqlXian);
}
else
{
XianY = "null";
}
//导入数据库sql语句
//string sql = string.Format("INSERT INTO [CLRAS].[dbo].[T_LIGHTNING]([OCCURTIME],[JINDU],[WEIDU] ,[INTENSITY],[GRADIENT] ,[CHARGE]" +
// ",[ENERGY],[OFFSET],[LOCATEMODE],[OCCURNS] ,[TODAYID],[PROVINCE],[COUNTY],[CITY],[GRIDKEY])" +
// "VALUES('{0}',{1},{2},{3},{4},{5},{6},{7},{8},{9},{10},{11},{12},{13},{14})",
// Time, WDu, JDu, QDu, DDu, WCu, DWu, 0, 0, 0, 0, 0, ShengY == null ? "null" : ShengY, ShiY == null ? "null" : ShiY, XianY == null ? "null" : XianY); DtConut.Rows.Add(new object[] { Time, WDu, JDu, QDu, DDu, WCu, DWu, 0, 0, 0, 0, 0, ShengY == null ? "null" : ShengY, ShiY == null ? "null" : ShiY, XianY == null ? "null" : XianY});
//try
//{
// Database db = DatabaseFactory.CreateDatabase();
// SqlCommand cmd = new SqlCommand(sql);
// if (db.ExecuteNonQuery(cmd) > 0)
// {
// IndexCheng++;
// }
// cmd.Dispose();
//}
//catch (Exception ex)
//{ // indexConut++;
//} }
catch (Exception ex)
{
indexConut++;
}
}
/// <summary>
/// 获取txt文本数据 并且转换格式
/// </summary>
/// <param name="al"></param>
public void DataDispose(ArrayList al)
{
for (int i = 0; i < al.Count; i++)
{
//读取第一行的txt文件数据
string sTemp = al[i].ToString();
//以空格 截取
string[] sArr = sTemp.Split(new string[] { " " }, StringSplitOptions.None);
if (sArr.Length < 11)
{
sArr = sTemp.Split(new string[] { "\t" }, StringSplitOptions.None);
} CountFrom(sArr);
}
}
if (!ShengY.Equals("国外"))
{
ShengY = ReturnZipcode(sqlSheng);
}
else
{
ShengY = "null";
}
//判断是否是国外 国外邮编为空
if (!ShiY.Equals("国外"))
{
ShiY = ReturnZipcode(sqlShi);
}
else
{
ShiY = "null";
}
//判断是否是国外 国外邮编为空
if (!XianY.Equals("国外"))
{
XianY = ReturnZipcode(sqlXian);
}"ReturnZipcode这里就是你的瓶颈点,每次去数据库中校验省,县,市?
ReturnZipcode方法是匹配邮编
//获取省邮编
//获取市邮编
//获取县邮编将这些邮编内容缓存到服务器内存中.每次连接数据库开销是很大的,并且你每条都使用以下的语句(还一堆的join)去访问数据库一千万条,你自己想下要耗费多少时间?=============解决以上的问题再跟我来谈多线程. //获取省邮编
string sqlSheng = string.Format("select top 1 zipcode.zip from provinces " +
"join cities on provinces.provinceid=cities.provinceid " +
"join areas on cities.cityid=areas.cityid " +
"join zipcode on areas.areaid=zipcode.areaid " +
"where province='{0}'", ShengY);
//获取市邮编
string sqlShi = string.Format("select top 1 zipcode.zip from provinces " +
"join cities on provinces.provinceid=cities.provinceid " +
"join areas on cities.cityid=areas.cityid " +
"join zipcode on areas.areaid=zipcode.areaid " +
"where province='{0}' and cities.city='{1}'", ShengY, ShiY);
//获取县邮编
string sqlXian = string.Format("select top 1 zipcode.zip from provinces " +
"join cities on provinces.provinceid=cities.provinceid " +
"join areas on cities.cityid=areas.cityid " +
"join zipcode on areas.areaid=zipcode.areaid " +
"where province='{0}' and cities.city='{1}' and areas.area='{2}'", ShengY, ShiY, XianY);没有啊 我先判断是否是国外的 是国外的我直接幅值为空 不是国外的我才往数据库里匹配邮编
2、改用Task实现。Task中有一个等待的方法task.Wait(),也有一个在任务完成后继续的方法Task.ContinueWith。
方法一:用B的Join方法把B的执行临时加入直到B执行完,操作系统会判断B是否执行完,程序员不需要操心;
方法二:设置一个公用的变量,在B线程代码的最后修改这个公用变量的值,通过判断这个值是否修改来判断B是否执行完。
方法一在封装的Join方法中操作系统判断线程执行完应该是以发中断信号的方式告诉操作系统的调度程序,方法二中程序员通过轮询一个变量的方式来进行线程调度,这种方式只是多占用了一部分cpu时间。本质上操作系统实现线程的调度应该主要是这两种方法,还可以通过发消息,但发消息是这两种方法的变体。
static int flag = 0;
static void Main(string[] args)
{
int ct = 10;
while (ct-- > 0)
{
Console.WriteLine("开启第{0}个线程", 10 - ct);
new Thread(Foo).Start();
} while (true)
{
if (flag < 10)
{
Console.WriteLine("等待");
System.Threading.Thread.Sleep(1000);
}
else
{
Console.WriteLine("所有线程完成");
break;
}
} Console.Read();
} static void Foo()
{
int a = new Random().Next(10000);
System.Threading.Thread.Sleep(a); Interlocked.Increment(ref flag);
}用标识位