我现在在做一个多线程上传的程序,在客户端把一个文件分成几个小模块,然后多线程上传,但是,在服务器端出现数据接收不成功,单调时,还很容易出现卡死,请各位高手指点。客户端代码如下:
//打开要上传的文件
m_pGlobalFile = fopen(strLocalFileName,"rb"); if(m_pGlobalFile == NULL)
{
AfxMessageBox("open file failed");
delete []m_pSectInfo->threadFromTo;
delete m_pSectInfo;
return FALSE;
} //多线程开始上传
hEvent = CreateEvent(NULL,FALSE,FALSE,NULL);//线程编号
hMutex = CreateMutex(NULL,FALSE,NULL);//互斥使用文件
SetEvent(hEvent);
for(int i=0;i<(int)m_pSectInfo->dwNumber;i++)
{
WaitForSingleObject(hEvent,INFINITE);
m_nThreadId = i;
CWinThread* pThread = AfxBeginThread(ThreadProc,this,THREAD_PRIORITY_NORMAL,0,CREATE_SUSPENDED,NULL);
m_nRunThread++;
pThread->ResumeThread();
}客户端线程函数:
UINT CUploadFileToFTP::ThreadProc(LPVOID pVoid)
{ CUploadFileToFTP* pUploadFile = (CUploadFileToFTP*)pVoid;
int threadId = pUploadFile->m_nThreadId;//保存线程ID SetEvent(hEvent);//设置事件为有信号状态
DWORD pos = pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart;
CString revStr;
CString strThreadId;
//保存服务器地址信息的结构
sockaddr_in serverAddr;
int addrLen = sizeof(serverAddr); strThreadId.Format("%d",threadId);
revStr=pUploadFile->QueueFtpCmd(pUploadFile->m_hCmdSock,"STOR",strThreadId);
SOCKET hAcceptSock = accept(pUploadFile->m_hDataSock,(SOCKADDR*)&serverAddr,&addrLen);
if(pUploadFile->GetCode(revStr)==425)
{
AfxMessageBox("数据通道打开失败");
return 0;
}
else if(pUploadFile->GetCode(revStr)==626)
{
while(hAcceptSock ==INVALID_SOCKET)
{
Sleep(0);
hAcceptSock = accept(pUploadFile->m_hDataSock,(SOCKADDR*)&serverAddr,&addrLen);
}
}
//计算本线程要发送的文件大小
DWORD dwSize = pUploadFile->m_pSectInfo->threadFromTo[threadId].dwEnd-pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart;
while(dwSize>0)
{
WaitForSingleObject(hMutex,INFINITE);
fseek(pUploadFile->m_pGlobalFile,pos,SEEK_SET);
char buffer[BUFFSIZE];
memset(buffer,0,BUFFSIZE);
DWORD dwLen=0;
if(dwSize<BUFFSIZE)
{
dwLen =fread(buffer,sizeof(char),dwSize,pUploadFile->m_pGlobalFile);
dwSize-=dwLen;
}//if
else
{
dwLen = fread(buffer,sizeof(char),BUFFSIZE,pUploadFile->m_pGlobalFile);
dwSize-=dwLen;
}//else
ReleaseMutex(hMutex); pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart+=dwLen;
pos+=dwLen;
int len =send(hAcceptSock,buffer,dwLen,0);//向服务器端发送数据
if(len!=(int)dwLen)
{
AfxMessageBox("Upload File failed");
ReleaseMutex(hMutex);
return 0;
}
//Sleep(20);
if(pUploadFile->m_bIsStop)
{
break;
}
//Sleep(50);
}//while
// Sleep(500);
closesocket(hAcceptSock);
pUploadFile->m_nRunThread--;
return 0;服务器端接收数据函数:
void CDataSocket::OnReceive(int nErrorCode)
{
Receive();
CAsyncSocket::OnReceive(nErrorCode);
}
int CDataSocket::Receive()
{
int nRead = 0;
if (m_nStatus == XFERMODE_RECEIVE)
{
//if (m_File.m_hFile == NULL)
// return 0; byte data[PACKET_SIZE];
//byte data2[PACKET_SIZE];
memset(data,0,PACKET_SIZE);
//memset(data2,0,PACKET_SIZE);
// if(!m_nFlg)
// {
nRead = CAsyncSocket::Receive(data, PACKET_SIZE);
//m_nFlg =1;
//}
//else
//{
//nRead = CAsyncSocket::Receive(data, PACKET_SIZE);
// m_nFlg=0;
// }
switch(nRead)
{
case 0:
{
//m_File.Close();
//m_File.m_hFile = NULL;
//Close();
// tell the client the transfer is complete.
/* CString szC;
szC.Format("%d", nCount);
AfxMessageBox(szC);*/
if(m_pConnectSocket->m_pSectInfo->dwFileSize == m_pConnectSocket->m_pSectInfo->dwUploadSize)//文件全部上传完毕
{
CString strTempFileName;
CString strNewFileName;
int index;
index = m_strData.ReverseFind('.');
strTempFileName = m_strData.Mid(0,index);
strNewFileName = strTempFileName;
strTempFileName+=".ljz";
DeleteFile(strTempFileName);//删除零时文件
// MoveFile(m_strData,strNewFileName);
// fclose(fpwrite);
for(int i=0;m_pConnectSocket->FileExists(strNewFileName);i++)
{
int index =strNewFileName.Find('.');
CString str;
if(i==0)
{
str =strNewFileName.Mid(0,index);
}
else
{
str = strNewFileName.Mid(0,index-1);
}
CString str2 = strNewFileName.Mid(index);
strNewFileName.Format("%s%d%s",str,i,str2);
}
rename(m_strData,strNewFileName);
m_pConnectSocket->SendResponse("226 Transfer complete");
}
else
{
if(m_pConnectSocket->m_pSectInfo->threadFromTo[m_nThreadId].dwStart
==m_pConnectSocket->m_pSectInfo->threadFromTo[m_nThreadId].dwEnd)
{
CString strResponse;
strResponse.Format("227 子文件%d传输完毕",m_nThreadId);
m_pConnectSocket->SendResponse(strResponse);
}
else
{
CString strConfig =m_pConnectSocket->WriteConfigFile(m_pConnectSocket->m_pSectInfo);
int index = m_strData.ReverseFind('.');
CString strConfigFileName = m_strData.Mid(0,index);
strConfigFileName+=".ljz";
FILE* fpwrite = NULL;
fpwrite = fopen(strConfigFileName,"wb");
if(fpwrite !=NULL)
{
fwrite(strConfig,1,strConfig.GetLength(),fpwrite);
fclose(fpwrite);
fpwrite = NULL;
}
m_pConnectSocket->SendResponse("426 Connection closed; transfer aborted.");
}
}
// destroy this socket
AfxGetThread()->PostThreadMessage(WM_THREADMSG, 0, 0);
// upload succesfull
((CConnectThread *)AfxGetThread())->UpdateStatistic(FTPSTAT_UPLOADSUCCEEDED);
break;
}
case SOCKET_ERROR:
{
if (GetLastError() != WSAEWOULDBLOCK)
{
// m_File.Close();
//m_File.m_hFile = NULL;
// Close();
CString strConfig =m_pConnectSocket->WriteConfigFile(m_pConnectSocket->m_pSectInfo);
int index = m_strData.ReverseFind('.');
CString strConfigFileName = m_strData.Mid(0,index);
strConfigFileName+=".ljz";
FILE* fpwrite = NULL;
fpwrite = fopen(strConfigFileName,"wb");
if(fpwrite !=NULL)
{
fwrite(strConfig,1,strConfig.GetLength(),fpwrite);
fclose(fpwrite);
fpwrite = NULL;
}
m_pConnectSocket->SendResponse("426 Connection closed; transfer aborted.");
// destroy this socket
AfxGetThread()->PostThreadMessage(WM_THREADMSG, 0, 0);
// upload failed
((CConnectThread *)AfxGetThread())->UpdateStatistic(FTPSTAT_UPLOADFAILED);
//SetEvent(hEvent);
nRead = 0;
}
break;
}
default:
{
((CConnectThread *)AfxGetThread())->IncReceivedBytes(nRead); TRY
{
// m_File.Seek(m_iFileStartPos,CFile::begin);
//m_File.Write(data, nRead); //if(WAIT_OBJECT_0 == WaitForSingleObject(hEvent,INFINITE))
// {//这里有问题?
/* gFileWrite.Lock();
FILE* fpwrite=fopen(m_strData,"r+b");
if(fpwrite == NULL)
{
AfxMessageBox("写入文件失败");
}
fseek(fpwrite,m_iFileStartPos,SEEK_SET);
fwrite(data,1,nRead,fpwrite);
fclose(fpwrite);
//}
//SetEvent(hEvent);
gFileWrite.Unlock();*/
// HANDLE hFile = CreateFile(m_strData,GENERIC_WRITE,FILE_SHARE_WRITE,NULL,OPEN_EXISTING,FILE_ATTRIBUTE_NORMAL,NULL);
//SetFilePointer(hFile,m_iFileStartPos,NULL, FILE_BEGIN);
// WriteFile(hFile,data,nRead,NULL,NULL);
//CloseHandle(hFile);
//用CFille对象试试看
CFile myFile;
myFile.Open(m_strData,CFile::modeWrite|CFile::shareDenyNone|CFile::modeCreate|CFile::modeNoTruncate);
myFile.Seek(m_iFileStartPos,CFile::begin);
myFile.Write(data,PACKET_SIZE);
myFile.Close(); memset(data,0,PACKET_SIZE);
m_iFileStartPos+= nRead;
gSection.Lock();
m_pConnectSocket->m_pSectInfo->dwUploadSize+= nRead;//统计已经上传的文件大小
m_pConnectSocket->m_pSectInfo->threadFromTo[m_nThreadId].dwStart+=nRead;
gSection.Unlock();
}
CATCH_ALL(e)
{
//m_File.Close();
// m_File.m_hFile = NULL;
//Close(); m_pConnectSocket->SendResponse("450 can't access file.");
// destroy this socket
AfxGetThread()->PostThreadMessage(WM_THREADMSG, 0, 0);
// upload failed
((CConnectThread *)AfxGetThread())->UpdateStatistic(FTPSTAT_UPLOADFAILED);
//SetEvent(hEvent);
return 0;
}
END_CATCH_ALL; break;
}
}
}
return nRead;
}
//打开要上传的文件
m_pGlobalFile = fopen(strLocalFileName,"rb"); if(m_pGlobalFile == NULL)
{
AfxMessageBox("open file failed");
delete []m_pSectInfo->threadFromTo;
delete m_pSectInfo;
return FALSE;
} //多线程开始上传
hEvent = CreateEvent(NULL,FALSE,FALSE,NULL);//线程编号
hMutex = CreateMutex(NULL,FALSE,NULL);//互斥使用文件
SetEvent(hEvent);
for(int i=0;i<(int)m_pSectInfo->dwNumber;i++)
{
WaitForSingleObject(hEvent,INFINITE);
m_nThreadId = i;
CWinThread* pThread = AfxBeginThread(ThreadProc,this,THREAD_PRIORITY_NORMAL,0,CREATE_SUSPENDED,NULL);
m_nRunThread++;
pThread->ResumeThread();
}客户端线程函数:
UINT CUploadFileToFTP::ThreadProc(LPVOID pVoid)
{ CUploadFileToFTP* pUploadFile = (CUploadFileToFTP*)pVoid;
int threadId = pUploadFile->m_nThreadId;//保存线程ID SetEvent(hEvent);//设置事件为有信号状态
DWORD pos = pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart;
CString revStr;
CString strThreadId;
//保存服务器地址信息的结构
sockaddr_in serverAddr;
int addrLen = sizeof(serverAddr); strThreadId.Format("%d",threadId);
revStr=pUploadFile->QueueFtpCmd(pUploadFile->m_hCmdSock,"STOR",strThreadId);
SOCKET hAcceptSock = accept(pUploadFile->m_hDataSock,(SOCKADDR*)&serverAddr,&addrLen);
if(pUploadFile->GetCode(revStr)==425)
{
AfxMessageBox("数据通道打开失败");
return 0;
}
else if(pUploadFile->GetCode(revStr)==626)
{
while(hAcceptSock ==INVALID_SOCKET)
{
Sleep(0);
hAcceptSock = accept(pUploadFile->m_hDataSock,(SOCKADDR*)&serverAddr,&addrLen);
}
}
//计算本线程要发送的文件大小
DWORD dwSize = pUploadFile->m_pSectInfo->threadFromTo[threadId].dwEnd-pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart;
while(dwSize>0)
{
WaitForSingleObject(hMutex,INFINITE);
fseek(pUploadFile->m_pGlobalFile,pos,SEEK_SET);
char buffer[BUFFSIZE];
memset(buffer,0,BUFFSIZE);
DWORD dwLen=0;
if(dwSize<BUFFSIZE)
{
dwLen =fread(buffer,sizeof(char),dwSize,pUploadFile->m_pGlobalFile);
dwSize-=dwLen;
}//if
else
{
dwLen = fread(buffer,sizeof(char),BUFFSIZE,pUploadFile->m_pGlobalFile);
dwSize-=dwLen;
}//else
ReleaseMutex(hMutex); pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart+=dwLen;
pos+=dwLen;
int len =send(hAcceptSock,buffer,dwLen,0);//向服务器端发送数据
if(len!=(int)dwLen)
{
AfxMessageBox("Upload File failed");
ReleaseMutex(hMutex);
return 0;
}
//Sleep(20);
if(pUploadFile->m_bIsStop)
{
break;
}
//Sleep(50);
}//while
// Sleep(500);
closesocket(hAcceptSock);
pUploadFile->m_nRunThread--;
return 0;服务器端接收数据函数:
void CDataSocket::OnReceive(int nErrorCode)
{
Receive();
CAsyncSocket::OnReceive(nErrorCode);
}
int CDataSocket::Receive()
{
int nRead = 0;
if (m_nStatus == XFERMODE_RECEIVE)
{
//if (m_File.m_hFile == NULL)
// return 0; byte data[PACKET_SIZE];
//byte data2[PACKET_SIZE];
memset(data,0,PACKET_SIZE);
//memset(data2,0,PACKET_SIZE);
// if(!m_nFlg)
// {
nRead = CAsyncSocket::Receive(data, PACKET_SIZE);
//m_nFlg =1;
//}
//else
//{
//nRead = CAsyncSocket::Receive(data, PACKET_SIZE);
// m_nFlg=0;
// }
switch(nRead)
{
case 0:
{
//m_File.Close();
//m_File.m_hFile = NULL;
//Close();
// tell the client the transfer is complete.
/* CString szC;
szC.Format("%d", nCount);
AfxMessageBox(szC);*/
if(m_pConnectSocket->m_pSectInfo->dwFileSize == m_pConnectSocket->m_pSectInfo->dwUploadSize)//文件全部上传完毕
{
CString strTempFileName;
CString strNewFileName;
int index;
index = m_strData.ReverseFind('.');
strTempFileName = m_strData.Mid(0,index);
strNewFileName = strTempFileName;
strTempFileName+=".ljz";
DeleteFile(strTempFileName);//删除零时文件
// MoveFile(m_strData,strNewFileName);
// fclose(fpwrite);
for(int i=0;m_pConnectSocket->FileExists(strNewFileName);i++)
{
int index =strNewFileName.Find('.');
CString str;
if(i==0)
{
str =strNewFileName.Mid(0,index);
}
else
{
str = strNewFileName.Mid(0,index-1);
}
CString str2 = strNewFileName.Mid(index);
strNewFileName.Format("%s%d%s",str,i,str2);
}
rename(m_strData,strNewFileName);
m_pConnectSocket->SendResponse("226 Transfer complete");
}
else
{
if(m_pConnectSocket->m_pSectInfo->threadFromTo[m_nThreadId].dwStart
==m_pConnectSocket->m_pSectInfo->threadFromTo[m_nThreadId].dwEnd)
{
CString strResponse;
strResponse.Format("227 子文件%d传输完毕",m_nThreadId);
m_pConnectSocket->SendResponse(strResponse);
}
else
{
CString strConfig =m_pConnectSocket->WriteConfigFile(m_pConnectSocket->m_pSectInfo);
int index = m_strData.ReverseFind('.');
CString strConfigFileName = m_strData.Mid(0,index);
strConfigFileName+=".ljz";
FILE* fpwrite = NULL;
fpwrite = fopen(strConfigFileName,"wb");
if(fpwrite !=NULL)
{
fwrite(strConfig,1,strConfig.GetLength(),fpwrite);
fclose(fpwrite);
fpwrite = NULL;
}
m_pConnectSocket->SendResponse("426 Connection closed; transfer aborted.");
}
}
// destroy this socket
AfxGetThread()->PostThreadMessage(WM_THREADMSG, 0, 0);
// upload succesfull
((CConnectThread *)AfxGetThread())->UpdateStatistic(FTPSTAT_UPLOADSUCCEEDED);
break;
}
case SOCKET_ERROR:
{
if (GetLastError() != WSAEWOULDBLOCK)
{
// m_File.Close();
//m_File.m_hFile = NULL;
// Close();
CString strConfig =m_pConnectSocket->WriteConfigFile(m_pConnectSocket->m_pSectInfo);
int index = m_strData.ReverseFind('.');
CString strConfigFileName = m_strData.Mid(0,index);
strConfigFileName+=".ljz";
FILE* fpwrite = NULL;
fpwrite = fopen(strConfigFileName,"wb");
if(fpwrite !=NULL)
{
fwrite(strConfig,1,strConfig.GetLength(),fpwrite);
fclose(fpwrite);
fpwrite = NULL;
}
m_pConnectSocket->SendResponse("426 Connection closed; transfer aborted.");
// destroy this socket
AfxGetThread()->PostThreadMessage(WM_THREADMSG, 0, 0);
// upload failed
((CConnectThread *)AfxGetThread())->UpdateStatistic(FTPSTAT_UPLOADFAILED);
//SetEvent(hEvent);
nRead = 0;
}
break;
}
default:
{
((CConnectThread *)AfxGetThread())->IncReceivedBytes(nRead); TRY
{
// m_File.Seek(m_iFileStartPos,CFile::begin);
//m_File.Write(data, nRead); //if(WAIT_OBJECT_0 == WaitForSingleObject(hEvent,INFINITE))
// {//这里有问题?
/* gFileWrite.Lock();
FILE* fpwrite=fopen(m_strData,"r+b");
if(fpwrite == NULL)
{
AfxMessageBox("写入文件失败");
}
fseek(fpwrite,m_iFileStartPos,SEEK_SET);
fwrite(data,1,nRead,fpwrite);
fclose(fpwrite);
//}
//SetEvent(hEvent);
gFileWrite.Unlock();*/
// HANDLE hFile = CreateFile(m_strData,GENERIC_WRITE,FILE_SHARE_WRITE,NULL,OPEN_EXISTING,FILE_ATTRIBUTE_NORMAL,NULL);
//SetFilePointer(hFile,m_iFileStartPos,NULL, FILE_BEGIN);
// WriteFile(hFile,data,nRead,NULL,NULL);
//CloseHandle(hFile);
//用CFille对象试试看
CFile myFile;
myFile.Open(m_strData,CFile::modeWrite|CFile::shareDenyNone|CFile::modeCreate|CFile::modeNoTruncate);
myFile.Seek(m_iFileStartPos,CFile::begin);
myFile.Write(data,PACKET_SIZE);
myFile.Close(); memset(data,0,PACKET_SIZE);
m_iFileStartPos+= nRead;
gSection.Lock();
m_pConnectSocket->m_pSectInfo->dwUploadSize+= nRead;//统计已经上传的文件大小
m_pConnectSocket->m_pSectInfo->threadFromTo[m_nThreadId].dwStart+=nRead;
gSection.Unlock();
}
CATCH_ALL(e)
{
//m_File.Close();
// m_File.m_hFile = NULL;
//Close(); m_pConnectSocket->SendResponse("450 can't access file.");
// destroy this socket
AfxGetThread()->PostThreadMessage(WM_THREADMSG, 0, 0);
// upload failed
((CConnectThread *)AfxGetThread())->UpdateStatistic(FTPSTAT_UPLOADFAILED);
//SetEvent(hEvent);
return 0;
}
END_CATCH_ALL; break;
}
}
}
return nRead;
}
http://www.codeproject.com/KB/IP/ftpclientclass.aspxA Complete FTP Server
http://www.codeproject.com/KB/IP/ftpserver.aspxFTP Wanderer - FTP Client using WININET
http://www.codeproject.com/KB/IP/ftpwanderer.aspx
服务器端用的是CAnysSocket类的成员函数,当有数据到得时候会触发Revice这个事客户端发送数据的代码段
//计算本线程要发送的文件大小
DWORD dwSize = pUploadFile->m_pSectInfo->threadFromTo[threadId].dwEnd-pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart;
while(dwSize>0)
{
WaitForSingleObject(hMutex,INFINITE);
fseek(pUploadFile->m_pGlobalFile,pos,SEEK_SET);
char buffer[BUFFSIZE];
memset(buffer,0,BUFFSIZE);
DWORD dwLen=0;
if(dwSize <BUFFSIZE)
{
dwLen =fread(buffer,sizeof(char),dwSize,pUploadFile->m_pGlobalFile);
dwSize-=dwLen;
}//if
else
{
dwLen = fread(buffer,sizeof(char),BUFFSIZE,pUploadFile->m_pGlobalFile);
dwSize-=dwLen;
}//else
ReleaseMutex(hMutex); pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart+=dwLen;
pos+=dwLen;
int len =send(hAcceptSock,buffer,dwLen,0);//向服务器端发送数据
if(len!=(int)dwLen)
{
AfxMessageBox("Upload File failed");
ReleaseMutex(hMutex);
return 0;
}
//Sleep(20);
if(pUploadFile->m_bIsStop)
{
break;
} //Sleep(50);
}//while
应该不会的,你先看看server那边getlasterror有什么东西没?
等待中
急
多线程发送文件,在创建线程时WaitForSingleObject(hEvent,INFINITE);
之前必须有SetEvent(hEvent);
因为WaitForSingleObject(hEvent,INFINITE);
还有:这个事客户端发送数据的代码段
//计算本线程要发送的文件大小
DWORD dwSize = pUploadFile->m_pSectInfo->threadFromTo[threadId].dwEnd-pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart;
while(dwSize>0)
{
WaitForSingleObject(hMutex,INFINITE);
fseek(pUploadFile->m_pGlobalFile,pos,SEEK_SET);
char buffer[BUFFSIZE];
memset(buffer,0,BUFFSIZE);
DWORD dwLen=0;
if(dwSize <BUFFSIZE)
{
dwLen =fread(buffer,sizeof(char),dwSize,pUploadFile->m_pGlobalFile);
dwSize-=dwLen;
}//if
else
{
dwLen = fread(buffer,sizeof(char),BUFFSIZE,pUploadFile->m_pGlobalFile);
dwSize-=dwLen;
}//else
ReleaseMutex(hMutex); //是不是有点早?,感觉放下面pUploadFile->m_pSectInfo->threadFromTo[threadId].dwStart+=dwLen;
pos+=dwLen; /×ReleaseMutex(hMutex);×/
int len =send(hAcceptSock,buffer,dwLen,0);//向服务器端发送数据