zoukankan      html  css  js  c++  java
  • 点对点(P2P)多线程断点续传的实现

        在如今的网络应用中,文件的传送是重要的功能之一,也是共享的基础。一些重要的协议像HTTPFTP等都支持文件的传送。尤其是FTP,它的全称就是文件传送协议,当 初的工程师设计这一协议就是为了解决网络间的文件传送问题,而且以其稳定,高速,简单而一直保持着很大的生命力。作为一个程序员,使用这些现有的协议传送文件相当简单,不过,它们只适用于服务器模式中。这样,当我们想在点与点之间传送文件就不适用了或相当麻烦,有一种大刀小用的意味。笔者一直想寻求一种简单有效,且具备多线程断点续传的方法来实现点与点之间的文件传送问题,经过大量的翻阅资料与测试,终于实现了,现把它共享出来,与大家分享。
    我写了一个以此为基础的实用程序(网络传圣,包含源代码),可用了基于TCP/IP的电脑上,供大家学习。
    upload/2004_06/04062118541204.gif

    实现方法(VC++,基于TCP/IP协议)如下:
    仍釆用服务器与客户模式,需分别对其设计与编程。
    服务器端较简单,主要就是加入待传文件,监听客户,和传送文件。而那些断点续传的功能,以及文件的管理都放在客户端上。

    一、服务器端

    首先介绍服务器端:
    最开始我们要定义一个简单的协议,也就是定义一个服务器端与客户端听得懂的语言。而为了把问题简化,我就让服务器只要听懂两句话,一就是客户说我要读文件信息,二就是我准备好了,可以传文件了
    由于要实现多线程,必须把功能独立出来,且包装成线程,首先建一个监听线程,主要负责接入客户,并启动另一个客户线程。我用VC++实现如下:

    DWORD WINAPI listenthread(LPVOID lpparam)
    { 
        //由主函数传来的套接字
      SOCKET  pthis=(SOCKET)lpparam;
        //开始监听
     int rc=listen(pthis,30);
        //如果错就显示信息
        if(rc<0){
       CString aaa;
       aaa="listen错误/n";
          AfxGetMainWnd()->SendMessageToDescendants(WM_AGE1,(LPARAM)aaa.Get
    Buffer(0),1);
       aaa.ReleaseBuffer();
       return 0;
     }
        //进入循环,并接收到来的套接字
     while(1){
        //新建一个套接字,用于客户端
     SOCKET s1;
     s1=accept(pthis,NULL,NULL);
     
       //给主函数发有人联入消息
        CString aa;
        aa="一人联入!/n";
        AfxGetMainWnd()->SendMessageToDescendants(WM_AGE1,(LPARAM)aa.GetBuff
    er(0),1);
     aa.ReleaseBuffer();
     DWORD dwthread;
        //建立用户线程
     ::CreateThread(NULL,0,clientthread,(LPVOID)s1,0,&dwthread); 
     }
     return 0;
    }

    接着我们来看用户线程:
    先看文件消息类定义

    struct fileinfo
    {
     int fileno;//文件号
     int type;//客户端想说什么(前面那两句话,用1,2表示)
     long len;//文件长度
     int seek;//文件开始位置,用于多线程
     
     char name[100];//文件名
    };

    用户线程函数:

    DWORD WINAPI clientthread(LPVOID lpparam)
    {
     //文件消息
     fileinfo* fiinfo;
     //接收缓存
     char* m_buf;
     m_buf=new char[100];
     //监听函数传来的用户套接字
     SOCKET  pthis=(SOCKET)lpparam;
     //读传来的信息
     int aa=readn(pthis,m_buf,100);
     //如果有错就返回
     if(aa<0){
      closesocket (pthis);
      return -1;
     }
     //把传来的信息转为定义的文件信息
     fiinfo=(fileinfo*)m_buf;
     CString aaa;
     //检验客户想说什么
     switch(fiinfo->type)
     {
     //我要读文件信息
     case 0:
     //读文件
     aa=sendn(pthis,(char*)zmfile,1080);
     //有错
     if(aa<0){ 
      closesocket (pthis);
      return -1;
     }
     //发消息给主函数
     aaa="收到LIST命令/n";
         AfxGetMainWnd()->SendMessageToDescendants(WM_AGE1,(LPARAM)aaa.GetBu
    ffer(0),1);
     break;
     //我准备好了,可以传文件了
     
     case 2:
     //发文件消息给主函数
     aaa.Format("%s  文件被请求!%s/n",zmfile[fiinfo->fileno].name,nameph[fii
    nfo->fileno]);
     AfxGetMainWnd()->SendMessageToDescendants(WM_AGE1,(LPARAM)aaa.GetBuffer
    (0),1);
     //读文件,并传送
     readfile(pthis,fiinfo->seek,fiinfo->len,fiinfo->fileno);
     //听不懂你说什么
     
     default:
     aaa="接收协议错误!/n";
         AfxGetMainWnd()->SendMessageToDescendants(WM_AGE1,(LPARAM)aaa.GetBu
    ffer(0),1);
     break;
    }
     
     return 0;
    }

    读文件函数

    void readfile(SOCKET  so,int seek,int len,int fino)
    {
     //文件名
     CString myname;
     myname.Format("%s",nameph[fino]);
     CFile myFile;
     //打开文件
     myFile.Open(myname, CFile::modeRead | CFile::typeBinary|CFile::shareDen
    yNone);
     //传到指定位置 
     myFile.Seek(seek,CFile::begin);
     char m_buf[SIZE];
     int len2;
     int len1;
     len1=len;
     //开始接收,直到发完整个文件
     while(len1>0){
      len2=len>SIZE?SIZE:len;
      myFile.Read(m_buf, len2);
      int aa=sendn(so,m_buf,len2);
     if(aa<0){ 
      closesocket (so);
      break;
     }
     len1=len1-aa;
     len=len-aa;
     }
     myFile.Close();
    }

    服务器端最要的功能各技术就是这些,下面介绍客户端。

    二、客户端

    客户端最重要,也最复杂,它负责线程的管理,进度的记录等工作。

    大概流程如下:
    先连接服务器,接着发送命令1(给我文件信息),其中包括文件长度,名字等,然后根据长度决定分几个线程下载,并初使化下载进程,接着发送命令2(可以给我传文件了),并记录文件进程。最后,收尾。
    这其中有一个十分重要的类,就是cdownload类,定义如下:

    class cdownload  
    {
    public:
     void createthread();//开线程
     DWORD finish1();//完成线程
     int sendlist();//发命令1
     downinfo doinfo;//文件信息(与服务器定义一样)
     int startask(int n);开始传文件n
     long m_index;
     BOOL good[BLACK];
     int  filerange[100];
     CString fname;
     CString fnametwo;
     UINT threadfunc(long index);//下载进程
     
     int sendrequest(int n);//发文件信息
     cdownload(int thno1);
     virtual ~cdownload();
    };

    下面先介绍sendrequest(int n),在开始前,向服务器发获得文件消息命令,以便让客户端知道有哪些文件可传

    int cdownload::sendrequest(int n)
    {
     //建套接字
     sockaddr_in local;
     SOCKET m_socket;
     
     int rc=0;
     //初使化服务器地址
     local.sin_family=AF_INET;
     local.sin_port=htons(1028);
     local.sin_addr.S_un.S_addr=inet_addr(ip);
     m_socket=socket(AF_INET,SOCK_STREAM,0);
     
     
     int ret;
     //联接服务器
     ret=connect(m_socket,(LPSOCKADDR)&local,sizeof(local));
     //有错的话
     if(ret<0){
      AfxMessageBox("联接错误");
     closesocket(m_socket);
     return -1;
     }
     //初使化命令
     fileinfo fileinfo1;
     fileinfo1.len=n;
     fileinfo1.seek=50;
     fileinfo1.type=1;
     //发送命令
     int aa=sendn(m_socket,(char*)&fileinfo1,100);
     if(aa<0){
      closesocket(m_socket);
      return -1;
     }
     //接收服务器传来的信息
      aa=readn(m_socket,(char*)&fileinfo1,100);
     if(aa<0){
      closesocket(m_socket);
      return -1;
     }
     //关闭
     shutdown(m_socket,2);
     closesocket(m_socket);
     
     return 1;
    }

    有了文件消息后我们就可以下载文件了。在主函数中,用法如下:

    //下载第clno个文件,并为它建一个新cdownload
    down[clno]=new cdownload(clno);
    //开始下载,并初使化
    type=down[clno]->startask(clno);
    //建立各线程
    createthread(clno);

    下面介绍开始方法:

    //开始方法
    int cdownload::startask(int n)
    {
     //读入文件长度
     doinfo.filelen=zmfile[n].length;
     //读入名字
     fname=zmfile[n].name;
     CString tmep;
     //初使化文件名
     tmep.Format("//temp//%s",fname);
     
     //给主函数发消息
     CString aaa;
     aaa="正在读取 "+fname+" 信息,马上开始下载。。。/n";
     AfxGetMainWnd()->SendMessageToDescendants(WM_AGE1,(LPARAM)aaa.GetBuffer
    (0),1);
     aaa.ReleaseBuffer();
     //如果文件长度小于0就返回
     if(doinfo.filelen<=0) return -1;
     //建一个以.down结尾的文件记录文件信息
     CString m_temp;
     m_temp=fname+".down";
     
     doinfo.name=m_temp;
     FILE* fp=NULL;
     CFile myfile;
     //如果是第一次下载文件,初使化各记录文件
     
     if((fp=fopen(m_temp,"r"))==NULL){
     filerange[0]=0;
     //文件分块
     for(int i=0;i<BLACK;i++)
     {
      if(i>0)
       filerange[i*2]=i*(doinfo.filelen/BLACK+1);
      filerange[i*2+1]=doinfo.filelen/BLACK+1;
     }
     filerange[BLACK*2-1]=doinfo.filelen-filerange[BLACK*2-2];
     
     myfile.Open(m_temp,CFile::modeCreate|CFile::modeWrite | CFile::typeBina
    ry);
     
     //写入文件长度
     myfile.Write(&doinfo.filelen,sizeof(int));
     myfile.Close();
     
     CString temp;
     for(int ii=0;ii<BLACK;ii++){
     //初使化各进程记录文件信息(以.downN结尾)
     
     temp.Format(".down%d",ii);
     m_temp=fname+temp;
     myfile.Open(m_temp,CFile::modeCreate|CFile::modeWrite | CFile::typeBina
    ry);
     //写入各进程文件信息
     myfile.Write(&filerange[ii*2],sizeof(int));
     myfile.Write(&filerange[ii*2+1],sizeof(int));
     myfile.Close();
     }
     
     ((CMainFrame*)::AfxGetMainWnd())->m_work.m_ListCtrl->AddItemtwo(n,2,0,0
    ,0,doinfo.threadno);
     }
     else{
     //如果文件已存在,说明是续传,读上次信息
     CString temp;
     
     m_temp=fname+".down0";
     if((fp=fopen(m_temp,"r"))==NULL)
      return 1;
     else fclose(fp);
     
     int bb;
     bb=0;
     //读各进程记录的信息
     for(int ii=0;ii<BLACK;ii++)
     {
      temp.Format(".down%d",ii);
      m_temp=fname+temp;
     
      myfile.Open(m_temp,CFile::modeRead | CFile::typeBinary);
      myfile.Read(&filerange[ii*2],sizeof(int));
      myfile.Read(&filerange[ii*2+1],sizeof(int));
      myfile.Close();
     
      bb = bb+filerange[ii*2+1];
      CString temp;
     }
     if(bb==0) return 1;
     doinfo.totle=doinfo.filelen-bb;
     
     ((CMainFrame*)::AfxGetMainWnd())->m_work.m_ListCtrl->AddItemtwo(n,2,doi
    nfo.totle,1,0,doinfo.threadno);
     
     }
     
      //建立下载结束进程timethread,以管现各进程结束时间。
     DWORD dwthread;
     ::CreateThread(NULL,0,timethread,(LPVOID)this,0,&dwthread);
     
     return 0;
    }

    下面介绍建立各进程函数,很简单:

    void CMainFrame::createthread(int threadno)
    {
     DWORD dwthread;
     //建立BLACK个进程
     for(int i=0;i<BLACK;i++)
     {
      m_thread[threadno][i]= ::CreateThread(NULL,0,downthread,(LPVOID)down[t
    hreadno],0,&dwthread);
     }
    }

    downthread进程函数

    DWORD WINAPI downthread(LPVOID lpparam)
    {
     cdownload* pthis=(cdownload*)lpparam;
     //进程引索+1
     InterlockedIncrement(&pthis->m_index);
     //执行下载进程
     pthis->threadfunc(pthis->m_index-1);
     return 1;
    }

    下面介绍下载进程函数,最最核心的东西了

    UINT cdownload::threadfunc(long index)
    {
     //初使化联接
     sockaddr_in local;
     SOCKET m_socket;
     
     int rc=0;
     
     local.sin_family=AF_INET;
     local.sin_port=htons(1028);
     local.sin_addr.S_un.S_addr=inet_addr(ip);
     m_socket=socket(AF_INET,SOCK_STREAM,0);
     
     int ret;
     //读入缓存
     char* m_buf=new char[SIZE];
     int re,len2;
     fileinfo fileinfo1;
     //联接
     ret=connect(m_socket,(LPSOCKADDR)&local,sizeof(local));
     //读入各进程的下载信息
     fileinfo1.len=filerange[index*2+1];
     fileinfo1.seek=filerange[index*2];
     fileinfo1.type=2;
     fileinfo1.fileno=doinfo.threadno;
     
     re=fileinfo1.len;
     
     //打开文件 
     CFile destFile;
     FILE* fp=NULL;
     //是第一次传的话
     if((fp=fopen(fname,"r"))==NULL)
      destFile.Open(fname, CFile::modeCreate|CFile::modeWrite | CFile::typeB
    inary|CFile::shareDenyNone);
     else
      //如果文件存在,是续传
      destFile.Open(fname,CFile::modeWrite | CFile::typeBinary|CFile::shareD
    enyNone);
     //文件指针移到指定位置
     destFile.Seek(filerange[index*2],CFile::begin);
     //发消息给服务器,可以传文件了
     sendn(m_socket,(char*)&fileinfo1,100);
     
     CFile myfile;
     CString temp;
     temp.Format(".down%d",index);
     m_temp=fname+temp;
     
      //当各段长度还不为0
     while(re>0){
      len2=re>SIZE?SIZE:re;
     
      //读各段内容
      int len1=readn(m_socket,m_buf,len2);
      //有错的话
      if(len1<0){
       closesocket(m_socket);
       break;
      }
     
     //写入文件
     destFile.Write(m_buf, len1); 
     
     //更改记录进度信息
     
     filerange[index*2+1]-=len1;
     filerange[index*2]+=len1;
     //移动记录文件指针到头
     myfile.Seek(0,CFile::begin);
     //写入记录进度
     myfile.Write(&filerange[index*2],sizeof(int));
     myfile.Write(&filerange[index*2+1],sizeof(int));
     
     //减去这次读的长度
     re=re-len1;
     
     //加文件长度
     doinfo.totle=doinfo.totle+len1;
     };
     
     //这块下载完成,收尾
     
     myfile.Close();
     destFile.Close();
     delete [] m_buf;
     shutdown(m_socket,2);
     if(re<=0) good[index]=TRUE;
     return 1;
    }

    到这客户端的主要模块和机制已基本介绍完。希望好好体会一下这种多线程断点续传的方法。  

  • 相关阅读:
    git .gitignore不生效的解决方法
    python 爬虫中常需要睡眠防止被封IP time sleep
    Python 实现字典操作详解
    Python遍历字典到列表中出现覆盖前面数据或者字典对值(值为列表)赋值出现重复的问题
    小红书app之shield 逆向
    淘宝h5 页面 sign加密算法
    jemter-base64加密
    Python 中更优雅的日志记录方案
    logging再学习
    elasticsearch中must和should条件同时满足
  • 原文地址:https://www.cnblogs.com/tyjsjl/p/2156108.html
Copyright © 2011-2022 走看看