我写了一个exe来处理大量的zip文件解压入库,思路是这样的。打开zip文件所在文件夹,分30个线程入库。但问题出来了,每次入库的结果要比实际的多一些。理论上应该是zip文件数>=数据库记录数,但实际上数据库记录数>zip文件数,我怀疑是线程控制出了问题导致文件重复入库,但我没查出来是哪里出了问题。请大家帮帮忙,谢谢代码:
Form1
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Text;
using System.Windows.Forms;
using System.Data.SqlClient;
using ICSharpCode.SharpZipLib.Zip;
using System.IO;
using System.Threading;namespace GameDataInsert2
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
} private void Form1_Load(object sender, EventArgs e)
{
setthread(); //Insert i = new Insert();
//i.Filename = "D:\\开发\\vs\\testdata\\1 - 复制.zip";
//i.InsertGame(); //inputGameData();
}
void inputGameData()
{ int threadnum = Convert.ToInt32(System.Configuration.ConfigurationSettings.AppSettings["threadnum"].ToString());
string folder = System.Configuration.ConfigurationManager.AppSettings["game"].ToString(); DirectoryInfo dir = new DirectoryInfo(folder); FileInfo[] fn = dir.GetFiles("*.zip"); for (int i = 0; i < fn.Length; i++)
{
//检查哪个线程空闲,新开线程
for (int ii = 0; ii < threadnum; ii++)
{
if (theth[ii].IsAlive == false)
{
string infile = dir.FullName + "\\" + fn[i].Name; Insert ins = new Insert();
ins.FileName = infile; ThreadStart thr_start_func = new ThreadStart(ins.InsertGame); theth[ii] = new Thread(thr_start_func);
theth[ii].Name = infile;
theth[ii].Start();
ii = threadnum + 1;
}
} } }
#region 线程控制
public static Thread fThread1;
public static Thread fThread2;
public static Thread fThread3;
public static Thread fThread4;
public static Thread fThread5; public static Thread fThread6;
public static Thread fThread7;
public static Thread fThread8;
public static Thread fThread9;
public static Thread fThread10; public static Thread fThread11;
public static Thread fThread12;
public static Thread fThread13;
public static Thread fThread14;
public static Thread fThread15; public static Thread fThread16;
public static Thread fThread17;
public static Thread fThread18;
public static Thread fThread19;
public static Thread fThread20; public static Thread fThread21;
public static Thread fThread22;
public static Thread fThread23;
public static Thread fThread24;
public static Thread fThread25;
public static Thread fThread26;
public static Thread fThread27;
public static Thread fThread28;
public static Thread fThread29;
public static Thread fThread30; public static Thread[] theth ={ fThread1,
fThread2, fThread3
, fThread4 ,fThread5, fThread6,
fThread7, fThread8,fThread9, fThread10,fThread11, fThread12,fThread13, fThread14, fThread15, fThread16,fThread17, fThread18,fThread19, fThread20
,fThread21, fThread22 , fThread23, fThread24, fThread25, fThread26, fThread27, fThread28
, fThread29, fThread30
};
#endregion void setthread()
{
for (int i = 0; i < theth.Length; i++)
{
ThreadStart thr_start_func = new ThreadStart(inputtmp);
theth[i] = new Thread(thr_start_func);
theth[i].Name = Convert.ToString(i + 1);
// Thread.Sleep(30);
theth[i].Start();
} // input(); } void inputtmp()
{ }
private void t_Tick(object sender, EventArgs e)
{
t.Enabled = false;
inputGameData();
t.Enabled = true;
} }
}
app.config中
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings> <add key ="conn166" value="server=192。168。1.166;user id=sa;password=******;database=i8game"/>
<add key="game" value="D:\\开发\\vs\\testdata\\"/> <add key="threadnum" value="10"/>
</appSettings>
</configuration>
Form1
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Text;
using System.Windows.Forms;
using System.Data.SqlClient;
using ICSharpCode.SharpZipLib.Zip;
using System.IO;
using System.Threading;namespace GameDataInsert2
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
} private void Form1_Load(object sender, EventArgs e)
{
setthread(); //Insert i = new Insert();
//i.Filename = "D:\\开发\\vs\\testdata\\1 - 复制.zip";
//i.InsertGame(); //inputGameData();
}
void inputGameData()
{ int threadnum = Convert.ToInt32(System.Configuration.ConfigurationSettings.AppSettings["threadnum"].ToString());
string folder = System.Configuration.ConfigurationManager.AppSettings["game"].ToString(); DirectoryInfo dir = new DirectoryInfo(folder); FileInfo[] fn = dir.GetFiles("*.zip"); for (int i = 0; i < fn.Length; i++)
{
//检查哪个线程空闲,新开线程
for (int ii = 0; ii < threadnum; ii++)
{
if (theth[ii].IsAlive == false)
{
string infile = dir.FullName + "\\" + fn[i].Name; Insert ins = new Insert();
ins.FileName = infile; ThreadStart thr_start_func = new ThreadStart(ins.InsertGame); theth[ii] = new Thread(thr_start_func);
theth[ii].Name = infile;
theth[ii].Start();
ii = threadnum + 1;
}
} } }
#region 线程控制
public static Thread fThread1;
public static Thread fThread2;
public static Thread fThread3;
public static Thread fThread4;
public static Thread fThread5; public static Thread fThread6;
public static Thread fThread7;
public static Thread fThread8;
public static Thread fThread9;
public static Thread fThread10; public static Thread fThread11;
public static Thread fThread12;
public static Thread fThread13;
public static Thread fThread14;
public static Thread fThread15; public static Thread fThread16;
public static Thread fThread17;
public static Thread fThread18;
public static Thread fThread19;
public static Thread fThread20; public static Thread fThread21;
public static Thread fThread22;
public static Thread fThread23;
public static Thread fThread24;
public static Thread fThread25;
public static Thread fThread26;
public static Thread fThread27;
public static Thread fThread28;
public static Thread fThread29;
public static Thread fThread30; public static Thread[] theth ={ fThread1,
fThread2, fThread3
, fThread4 ,fThread5, fThread6,
fThread7, fThread8,fThread9, fThread10,fThread11, fThread12,fThread13, fThread14, fThread15, fThread16,fThread17, fThread18,fThread19, fThread20
,fThread21, fThread22 , fThread23, fThread24, fThread25, fThread26, fThread27, fThread28
, fThread29, fThread30
};
#endregion void setthread()
{
for (int i = 0; i < theth.Length; i++)
{
ThreadStart thr_start_func = new ThreadStart(inputtmp);
theth[i] = new Thread(thr_start_func);
theth[i].Name = Convert.ToString(i + 1);
// Thread.Sleep(30);
theth[i].Start();
} // input(); } void inputtmp()
{ }
private void t_Tick(object sender, EventArgs e)
{
t.Enabled = false;
inputGameData();
t.Enabled = true;
} }
}
app.config中
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings> <add key ="conn166" value="server=192。168。1.166;user id=sa;password=******;database=i8game"/>
<add key="game" value="D:\\开发\\vs\\testdata\\"/> <add key="threadnum" value="10"/>
</appSettings>
</configuration>
using System.Collections.Generic;
using System.Text;
using System.IO;
using ICSharpCode.SharpZipLib.Zip;
using System.Data.SqlClient;
using System.Data;
using System.Windows.Forms;
using System.Threading;namespace GameDataInsert2
{
class Insert
{
string filename = ""; public string FileName
{
get { return filename; }
set { filename = value; }
} public void InsertGame()
{ string folder = System.Configuration.ConfigurationManager.AppSettings["game"].ToString();
FileInfo f = new FileInfo(filename);
if (f.Exists == false)
return; DirectoryInfo dir = new DirectoryInfo(folder); try
{
//取得nid
string nid = "000000";
string[] ff = f.Name.ToString().Split('-');
string ip = "127.0.0.1";
if (ff.Length > 5)
{
ip = ff[1] + "." + ff[2] + "." + ff[3] + "." + ff[4];
} nid = ff[0];
string dt = f.LastWriteTime.ToString(); //如果时间不到5分钟.不插入
TimeSpan aa = DateTime.Now - f.CreationTime;
if (aa.TotalMinutes >= 5)
{
//读取zip文件.解压
string str = ""; #region 特殊处理,将第一位改为ASC 50 Stream zipstream = null;
using (FileStream fstmp = new FileStream(f.FullName, FileMode.Open))
{
long ii = fstmp.Length; byte[] b1 = new byte[ii]; fstmp.Read(b1, 0, b1.Length); fstmp.Close(); byte[] b2 = new byte[512000]; byte[] tmp1 = System.Text.Encoding.ASCII.GetBytes("P");
Array.Copy(tmp1, 0, b2, 0, 1); Array.Copy(b1, 1, b2, 1, b1.Length - 1); zipstream = new MemoryStream(b2, 0, b1.Length);
}
#endregion using (ZipInputStream s = new ZipInputStream(zipstream))
{
//ms.Close();
ZipEntry theEntry;
try
{ #region 解压
while ((theEntry = s.GetNextEntry()) != null)
{
string fileName = Path.GetFileName(theEntry.Name); if (fileName != String.Empty)
{
int size = 2048;
byte[] data = new byte[2048];
while (true)
{
size = s.Read(data, 0, data.Length);
if (size > 0)
{ Encoding asc = System.Text.Encoding.GetEncoding("GB2312"); str += asc.GetString(data, 0, size); }
else
{ break;
} } }
}
s.Close();
s.Dispose();
#endregion #region 成功取得文件内容,入库
string strConn = System.Configuration.ConfigurationManager.AppSettings["conn166"].ToString(); using (SqlConnection MyConn = new SqlConnection(strConn))
{
MyConn.Open();
string cmdText = "i8game_sreportgame";
SqlCommand executeCmd = new SqlCommand();
executeCmd.Connection = MyConn;
// 指出executeCmd是执行存储过程
executeCmd.CommandType = CommandType.StoredProcedure;
executeCmd.CommandText = cmdText;
executeCmd.CommandTimeout = 60; // 参数一
SqlParameter parameter = new SqlParameter("@reportCont", SqlDbType.NText);
parameter.Value = str;
executeCmd.Parameters.Add(parameter);
// 参数二
parameter = new SqlParameter("@sip", SqlDbType.Char, 20);
parameter.Value = ip;
executeCmd.Parameters.Add(parameter); parameter = new SqlParameter("@date", SqlDbType.DateTime);
parameter.Value = Convert.ToDateTime(dt);
executeCmd.Parameters.Add(parameter);
executeCmd.ExecuteNonQuery();
executeCmd.Dispose();
executeCmd.Cancel();
MyConn.Close();
} #endregion }
catch (Exception ex)
{
s.Close();
s.Dispose();
WriteErrorLog(ex.ToString() + "\n" + filename);
moveToErrfiles(dir.FullName, f.Name);
}
}
try
{
#region 将数据移到备份表中
if (!Directory.Exists(dir.FullName + "\\..\\databak\\" + DateTime.Now.ToString("yyyyMMdd")))
{
Directory.CreateDirectory(dir.FullName + "\\..\\databak\\" + DateTime.Now.ToString("yyyyMMdd"));
}
if (File.Exists(dir.FullName + "\\" + f.Name))
{
f.MoveTo(dir.FullName + "\\..\\databak\\" + DateTime.Now.ToString("yyyyMMdd") + "\\" + f.Name);
} #endregion
}
catch (Exception ex)
{
WriteErrorLog(ex.ToString() + "\n" + filename);
moveToErrfiles(dir.FullName, f.Name);
} }
}
catch (Exception ex)
{
WriteErrorLog(ex.ToString() + "\n" + filename);
moveToErrfiles(dir.FullName, f.Name);
}
} public void WriteErrorLog(string reqcont)
{
try
{
using (System.IO.StreamWriter sw = System.IO.File.AppendText(Application.StartupPath + "\\ErrorLogs\\" + string.Format("{0:yyyy-MM-dd}", System.DateTime.Now) + ".txt"))
{
sw.WriteLine("---------" + string.Format("{0:yyyy-MM-dd HH:mm:ss}", System.DateTime.Now) + "---------");
sw.WriteLine("reqcont:");
sw.WriteLine(reqcont);
sw.Flush();
sw.Close();
}
}
catch
{
}
} void moveToErrfiles(string dir, string filename)
{
//将出错的文件移到/errfiles下
ThreadStart ths = new ThreadStart(domoveTo); Thread t = new Thread(ths);
t.Name = dir + "&" + filename; t.Start();
} void domoveTo()
{
string dir = "";
string filename = "";
Thread nowth = Thread.CurrentThread;
string[] tmp = nowth.Name.Split('&');
dir = tmp[0];
filename = tmp[1]; Thread.Sleep(10000);
try
{
string folder = Application.StartupPath + "\\errfiles\\" + DateTime.Now.ToString("yyyyMMdd");
if (!Directory.Exists(folder))
Directory.CreateDirectory(folder); if (File.Exists(folder + "\\" + filename))
File.Delete(folder + "\\" + filename); File.Move(dir + "\\" + filename, folder + "\\" + filename);
}
catch (Exception ex)
{
WriteErrorLog(ex.ToString() + filename); } }
}
}
List<String> ExpendingFiles
...//检查哪个线程空闲,新开线程
for (int ii = 0; ii < threadnum; ii++)
{
if (theth[ii].IsAlive == false)
{
string infile = dir.FullName + "\\" + fn[i].Name;
if (ExpendingFiles.IndexOf("infile")==-1)
ExpendingFiles.Add(infile);
else
{
i++;
continue;
}
Insert ins = new Insert();
... }
{
string infile = dir.FullName + "\\" + fn[i].Name;
...
break;
}
那么就不需要用using了,特别是嵌套的using。(生成的IL会让你很郁闷)
自己手动Try/Catch/Finally...多好
if (aa.TotalMinutes >= 5)
{ lock(o){...}}
t.Enabled = false;
inputGameData();
t.Enabled = true;timer的频率设得成的是1秒。当30个线程都在工作时,inputGameData()会结束,一秒后,新的inputGameData()又会开始执行,但是这个时候,那30个线程有部分线程可能还没有解压完。这个时候就会重复入库了。谢谢大家