zoukankan      html  css  js  c++  java
  • 多线程网络文件传输2

    多线程网络文件传输

    效果

    【】sender 文件发送

    可以选择多个文件,每个文件路径动态生成一个进度条和一个runner,并且将runner加入manager。

     

    void AndySender::on_pbt_selectFile_clicked()

    {

    QFileDialog fileDialog(this);

     

    //设置窗口的标题

    fileDialog.setWindowTitle("选择要发送的文件");

    fileDialog.setNameFilter("所有文件(*.*)"); //设置一个过滤器

    //这个标志用来设置选择的类型,比如默认是单个文件。QFileDialog::ExistingFiles 多个文件,还可以用来选择文件夹QFileDialog::Directory。QFileDialog::ExistingFile 单个文件。注意这个ExistingFile,单词后面多了一个s 表示选择多个文件。要看清楚了。

    fileDialog.setFileMode(QFileDialog::ExistingFiles);

    //弹出对话框

    if (fileDialog.exec() == QDialog::Accepted)

    {

     

    TaskManager *manager =TaskManager::getInstance();

    //strPathList 返回值是一个list,如果是单个文件选择的话,只要取出第一个来就行了。

    QStringList strPathList = fileDialog.selectedFiles();

    foreach(QString path , strPathList)

    {

    qDebug()<<"添加文件上传任务:"<<path;

    AndyProgressBar * progressBar =new AndyProgressBar();

    TaskRunner * runner =new TaskRunner;

    QString hostIp =ui->lineEdit->text();

    if(hostIp=="")

    {

    hostIp ="127.0.0.1";

    }

    runner->setHostIp(hostIp);

    runner->setTask(path,TaskRunner::uploadFile);

    connect(runner,SIGNAL(taskFinish(void*)),runner,SLOT(deleteLater()));

    connect(runner,SIGNAL(UpdatePercent(int)), progressBar,SLOT (on_UpdatePercent(int)));

    connect(runner,SIGNAL(UpdateMaximum(int)), progressBar,SLOT (on_UpdateMaximum(int)));

    connect(runner,SIGNAL(UpdateText(QString)), progressBar,SLOT(on_UpdateText(QString)));

     

    progressBar->SetMaxRange(100);

    progressBar->SetFileName(path.split("/").last());

    ui->vl_content->addWidget(progressBar);

    manager->addTask(runner);

    }

    }

    }

     

    【】TaskRunner 建立tcpsocket

    建立tcpsocket

     

    TaskRunner::TaskRunner()

    {

    send = new QTcpSocket(this);

    fileBytes = sentBytes = restBytes = 0;

    loadBytes = LOADBYTES;

    /* 连接已建立 -> 开始发数据 */

    connect(send, SIGNAL(connected()),

    this, SLOT(start_transfer()));

    /* 数据已发出 -> 继续发 */

    connect(send, SIGNAL(bytesWritten(qint64)),

    this, SLOT(continue_transfer(qint64)));

    /* socket出错 -> 错误处理 */

    connect(send, SIGNAL(error(QAbstractSocket::SocketError)),

    this, SLOT(show_error(QAbstractSocket::SocketError)));

    }

     

    发送文件首部
    

    /*
    						首部
    								=
    										总大小
    												+
    														文件名长度
    																+
    																		文件名
    																				*/
    																			

    out << qint64(0) << qint64(0) << sfName;

    /* 总大小加上首部的大小 */

    fileBytes += buf.size();

     

    TaskRunner::~TaskRunner()

    {

    SafeDelete(send);

    }

     

    void TaskRunner::setHostIp(QString ip)

    {

    _ip =ip;

    }

    void TaskRunner::on_DoRequest()

    {

    if(type ==uploadFile)

    {

    // 建立连接 tcp

    send->connectToHost(QHostAddress(_ip), PORT);

    if(send->waitForConnected(1000))

    {

    qDebug()<<"连接服务器成功!";

     

    sentBytes = 0;

    }

     

    }

    }

    void TaskRunner::setTask(QString path,TaskType _type)

    {

    fileName =path;

    type =_type;

    emit UpdateText("任务等待中");

    }

    /*--- 开始传送 ---*/

    void TaskRunner::start_transfer()

    {

    file = new QFile(fileName);

    if(!file->open(QFile::ReadOnly))

    {

    emit UpdatePercent(0);

    emit UpdateText("文件打开失败");

    qDebug() << "*** start_transfer(): File-Open-Error";

    return;

    }

     

    fileBytes = file->size();

    emit UpdatePercent(0);

    emit UpdateText("连接已经建立");

     

    QByteArray buf;

    QDataStream out(&buf, QIODevice::WriteOnly);

    out.setVersion(DATA_STREAM_VERSION);

     

    /* 无路径文件名 */

    QString sfName = fileName.right(fileName.size() -

    fileName.lastIndexOf('/') - 1);

    /* 首部 = 总大小 + 文件名长度 + 文件名 */

    out << qint64(0) << qint64(0) << sfName;

    /* 总大小加上首部的大小 */

    fileBytes += buf.size();

    emit UpdateMaximum(fileBytes);

    /* 重写首部的前两个长度字段 */

    out.device()->seek(0);

    out << fileBytes << (qint64(buf.size()) - 2 * sizeof(qint64));

    /* 发送首部,计算剩余大小 */

    restBytes = fileBytes - send->write(buf);

    }

     

    /*--- 继续传输 ---*/

    void TaskRunner::continue_transfer(qint64 sentSize)

    {

    sentBytes += sentSize;

    emit UpdatePercent(sentBytes);

    /* 还有数据要发 */

    if(restBytes > 0)

    {

    /* 从文件读数据 */

    QByteArray buf = file->read(qMin(loadBytes, restBytes));

    /* 发送 */

    restBytes -= send->write(buf);

    }

    else

    file->close();

    /* 全部发送完 */

    if(sentBytes == fileBytes)

    {

    send->close(); // 关socket

    fileName.clear(); // 清空文件名

    emit UpdateText("发送完成");

    emit taskFinish(nullptr);

    }

    }

     

    /*--- 出错处理 ---*/

    void TaskRunner::show_error(QAbstractSocket::SocketError)

    {

    qDebug() << "*** Socket Error";

    send->close();

    UpdateText("任务失败,稍后重试!");

    UpdatePercent(0);

    fileName.clear();

    emit taskFinish(nullptr);

    }

     

     

     

    【】TaskManager线程管理(单例)

    生成线程列表

    #pragma once

    #include "preheader.h"

     

    class TaskRunner;

    class TaskManager : public QObject

    {

    Q_OBJECT

     

    public:

    static TaskManager* getInstance();

     

     

    void addTask(TaskRunner*);

    private:

    TaskManager();

    ~TaskManager();

    static TaskManager *m_instance;

     

    QList<QThread*> _threadList;

    QList<TaskRunner*> _waittingTasks; //等待中的任务

    QMap<QThread*,TaskRunner*> _runningTasks; //进行中的任务

     

     

    private slots:

    void onUpdateTaskList();

    void on_CircleTaskList();

    };

     

        生成三十个线程(使用匿名函数),每隔200毫秒检查有没有新的task加入waitlist,新的task加入线程列表中未使用的线程,task结束时,task结束时将runningTasks更新,

    #include "TaskManager.h"

    #include "TaskRunner.h"

     

    const int MaxThreadCount =30;

    TaskManager* TaskManager::m_instance = NULL;

    TaskManager::TaskManager()

    {

    auto initThreadList=[&]()

    {

    for(int i=0; i<MaxThreadCount; i++)

    {

    QThread *thread =new QThread(this);

    thread->start();

    _threadList.push_back(thread);

    }

    };

    initThreadList();

    QTimer *timer=new QTimer();

    connect(timer,SIGNAL(timeout()), this,SLOT(on_CircleTaskList()));

    timer->start(200);

     

    }

     

    TaskManager::~TaskManager()

    {

     

    }

     

    TaskManager* TaskManager::getInstance()

    {

    if(m_instance == NULL)

    m_instance = new TaskManager();

    return m_instance;

    }

    void TaskManager::addTask(TaskRunner* runner)

    {

    _waittingTasks.push_front(runner);

    }

    void TaskManager::on_CircleTaskList()

    {

    auto circleTaskList =[&](QList<TaskRunner*>& _waitTasks, QList<QThread*> &threadList , QMap<QThread*,TaskRunner*> &runningTasks )

    {

    while(_waitTasks.count()>0 && runningTasks.count() <threadList.count())

    {

     

    TaskRunner* runner = _waitTasks.takeFirst();

    if(runner)

    {

    for(int i=0; i<threadList.count(); i++)

    {

    QThread* thread =threadList[i];

    if(runningTasks.contains(thread))

    continue;

    else

    {

    runner->moveToThread(thread);

    connect(runner, SIGNAL(taskFinish(void*)), this,SLOT(onUpdateTaskList()));

    runningTasks.insert(thread,runner);

    QTimer::singleShot(0,runner, &TaskRunner::on_DoRequest);

    break;

    }

    }

     

    }

    }

    } ;

    circleTaskList(_waittingTasks,_threadList,_runningTasks);

    }

    void TaskManager::onUpdateTaskList()

    {

    TaskRunner * runner =(TaskRunner*)sender();

     

    if(runner)

    {

    for(auto itr =_runningTasks.begin(); itr !=_runningTasks.end(); itr++)

    {

    if(itr.value() ==runner)

    {

    _runningTasks.erase(itr++);

    break;

    }

    }

    runner->deleteLater();

    }

    }

     

     

    【】Receiver

     

    【】TcpServer accept incoming TCP connections

    为每一个连接生成一个TcpSocket 和一个线程,然后moveToThread,加入列表 QHash<int,TcpSocket *> * tcpClient;,在线程里面使用这个socket,必须使用incomingConnection,tcp断开后通知线程管理类,

     

    Note: If you want to handle an incoming connection as a new QTcpSocket object in another thread you have to pass the socketDescriptor to the other thread and create the QTcpSocket object there and use its setSocketDescriptor() method.

    #ifndef TCPSERVER_H

    #define TCPSERVER_H

     

    #include <QTcpServer>

    #include <QHash>

    #include "tcpsocket.h"

     

     

    //继承QTCPSERVER以实现多线程TCPscoket的服务器。

    //如果socket的信息处理直接处理的话,很多新建的信号和槽是用不到的

    class TcpServer : public QTcpServer

    {

    Q_OBJECT

    public:

    explicit TcpServer(QObject *parent = 0,int numConnections = 10000);

    ~TcpServer();

     

    void setMaxPendingConnections(int numConnections);//重写设置最大连接数函数

    signals:

    void connectClient(const int , const QString & ,const quint16 );//发送新用户连接信息

    void readData(const int,const QString &, quint16, const QByteArray &);//发送获得用户发过来的数据

    void sockDisConnect(int ,QString ,quint16);//断开连接的用户信息

    void sentData(const QByteArray &,const int);//向scoket发送消息

    void sentDisConnect(int i); //断开特定连接,并释放资源,-1为断开所有。

     

    void UpdateText(QString);

    void UpdateMaximum(int);

    void UpdatePercent(int);

    void createNewTask(TcpSocket*);

    public slots:

    void clear(); //断开所有连接,线程计数器请0

    protected slots:

    void sockDisConnectSlot(int handle,const QString & ip, quint16 prot, QThread *th);//断开连接的用户信息

     

    protected:

    void incomingConnection(qintptr socketDescriptor);//覆盖已获取多线程

    private:

    QHash<int,TcpSocket *> * tcpClient;//管理连接的map

    int maxConnections;

     

    };

     

    #endif // TCPSERVER_H

     

     

    【】TcpSocket 在线程使用的socket,接受文件

    #include "tcpsocket.h"

    #include <QtConcurrent/QtConcurrent>

    #include <QHostAddress>

    #include <QDebug>

    extern QString _saveDir;

    const quint16 PORT = 3333;

    const qint64 LOADBYTES = 4 * 1024; // 4 kilo-byte

    const int DATA_STREAM_VERSION = QDataStream::Qt_4_8;

    TcpSocket::TcpSocket(qintptr socketDescriptor, QObject *parent) : //构造函数在主线程执行,lambda在子线程

    QTcpSocket(parent),socketID(socketDescriptor)

    {

    this->setSocketDescriptor(socketDescriptor);

    connect(this,&TcpSocket::readyRead,this,&TcpSocket::readData);

    connect(this,SIGNAL(error(QAbstractSocket::SocketError)),

    this, SLOT(receive_error(QAbstractSocket::SocketError)));

     

    emit UpdateText("开始接收数据!") ;

    receive_gotBytes=0;

    }

     

    TcpSocket::~TcpSocket()

    {

    }

     

    /*--- 出错处理 ---*/

    void TcpSocket::receive_error(QAbstractSocket::SocketError)

    {

    qDebug() << "*** Socket Error ***" << this->errorString();

    this->close(); // 关cocket

     

    receive_fileName.clear(); // 清空文件名

    receive_fileBytes = receive_gotBytes = receive_nameSize = 0;

    emit UpdateText("接收数据失败!") ;

     

    SafeDelete(receive_file);

    }

    void TcpSocket::readData()

    {

    QDataStream in(this);

    in.setVersion(DATA_STREAM_VERSION);

     

     

    /* 首部未接收/未接收完 */

    if(receive_gotBytes <= 2 * sizeof(qint64))

    {

    if(!receive_nameSize) // 前两个长度字段未接收

    {

    if(this->bytesAvailable() >= 2 * sizeof(qint64))

    {

    in >> receive_fileBytes >> receive_nameSize;

    receive_gotBytes += 2 * sizeof(qint64);

    emit UpdateMaximum(receive_fileBytes);

    emit UpdatePercent(receive_gotBytes);

    }

    else // 数据不足,等下次

    {

    qDebug()<<" 数据不足文件名长度,等下次 Errir 044";

    return;

    }

    }

    else if(this->bytesAvailable() > receive_nameSize)

    {

    in >> receive_fileName;

    receive_gotBytes += receive_nameSize;

    emit UpdatePercent(receive_gotBytes);

    qDebug()<< "--- File Name: "

    << receive_fileName;

    }

    else // 数据不足文件名长度,等下次

    {

    qDebug()<<" 数据不足文件名长度,等下次 Errir 046";

    return;

    }

    }

     

    /* 已读文件名、文件未打开 -> 尝试打开文件 */

    if(!receive_fileName.isEmpty() && receive_file == Q_NULLPTR)

    {

    QString saveFilePath =_saveDir+(receive_fileName);

    qDebug()<<_saveDir<<"保存路径为 :"<< saveFilePath;

    receive_file = new QFile(saveFilePath);

    if(!receive_file->open(QFile::WriteOnly)) // 打开失败

    {

    qDebug() << "*** File Open Failed ***" ;

    SafeDelete(receive_file);

    return;

    }

    emit UpdateText(QString("文件%1打开成功!").arg(saveFilePath)) ;

    }

    if(receive_file == Q_NULLPTR) // 文件未打开,不能进行后续操作

    return;

     

    if(receive_gotBytes < receive_fileBytes) // 文件未接收完

    {

    receive_gotBytes += this->bytesAvailable();

    emit UpdatePercent(receive_gotBytes);

    receive_file->write(this->readAll());

    }

    if(receive_gotBytes == receive_fileBytes) // 文件接收完

    {

    this->close(); // 关socket

    receive_file->close(); // 关文件

    SafeDelete(receive_file);

    emit UpdatePercent(receive_gotBytes);

    emit UpdateText("文件接收完成!") ;

    }

    }

     

    【】ThreadHandle 为每一个接收文件的socket设置线程

     

    #include "threadhandle.h"

    #include "eventdispatcher_libev/eventdispatcher_libev.h"

     

    ThreadHandle::ThreadHandle()

    {

    initfist = false;

    }

     

    ThreadHandle::~ThreadHandle() //停止所有线程,并释放资源

    {

    QThread * tmp;

    for (auto it = threadSize.begin(); it != threadSize.end(); ++it)

    {

    tmp = it.key();

    tmp->exit();

    tmp->wait(3000);

    delete tmp;

    }

    }

     

    ThreadHandle & ThreadHandle::getClass()

    {

    static ThreadHandle th;

    return th;

    }

     

    QThread *ThreadHandle::getThread()

    {

    if (!initfist)

    {

    initThreadType(THREADSIZE,10);

    }

    if (type == THREADSIZE)

    return findThreadSize();

    else

    return findHandleSize();

    }

     

    void ThreadHandle::removeThread(QThread * thread)

    {

    auto t = threadSize.find(thread);

    if (t != threadSize.end())

    {

    t.value() --;

    if (type == HANDLESIZE && t.value() == 0 && threadSize.size() > 1)

    {

    threadSize.remove(thread);

    thread->exit();

    thread->wait(3000);

    delete thread;

    }

    }

    }

     

    void ThreadHandle::initThreadType(ThreadType type, unsigned int max)

    {

    if (!initfist)

    {

    this->type = type;

    this->size = max;

    if (this->size == 0)

    {

    if(type == THREADSIZE)

    this->size = 10;

    else

    this->size = 1000;

    }

     

    if (type == THREADSIZE)

    initThreadSize();

    else

    {

    QThread * tmp = new QThread;

    #ifndef Q_OS_WIN

    tmp->setEventDispatcher(new EventDispatcherLibEv());

    #endif

    threadSize.insert(tmp,0);

    tmp->start();

    }

    }

    initfist = true;

    }

     

    void ThreadHandle::initThreadSize() //建立好线程并启动,

    {

    QThread * tmp;

    for (unsigned int i = 0; i < size;++i)

    {

    tmp = new QThread;

    #ifndef Q_OS_WIN

    tmp->setEventDispatcher(new EventDispatcherLibEv());

    #endif

    threadSize.insert(tmp,0);

    tmp->start();

    }

    }

     

    QThread * ThreadHandle::findHandleSize() //查找到线程里的连接数小于最大值就返回查找到的,找不到就新建一个线程

    {

    for (auto it = threadSize.begin();it != threadSize.end() ;++it)

    {

    if (it.value() < size)

    {

    it.value() ++;

    return it.key();

    }

    }

    QThread * tmp = new QThread;

    #ifndef Q_OS_WIN

    tmp->setEventDispatcher(new EventDispatcherLibEv());

    #endif

    threadSize.insert(tmp,1);

    tmp->start();

    return tmp;

    }

     

    QThread *ThreadHandle::findThreadSize() //遍历查找所有线程中连接数最小的那个,返回

    {

    auto it = threadSize.begin();

    auto ite = threadSize.begin();

    for (++it ; it != threadSize.end(); ++it)

    {

    if (it.value() < ite.value())

    {

    ite = it;

    }

    }

    ite.value() ++;

    return ite.key();

    }

     

    void ThreadHandle::clear()//仅仅清空计数,线程不释放

    {

    for (auto it = threadSize.begin();it != threadSize.end() ;++it)

    {

    it.value() = 0;

    }

    }

     

  • 相关阅读:
    RAND函数和SRAND函数
    称丢手帕问题
    用GDB调试程序(七)
    用GDB调试程序(六)
    用GDB调试程序(三)
    用GDB调试程序(五)
    用GDB调试程序(二)
    用GDB调试程序(一)
    SOAP 简单对象访问协议
    关于angularJS绑定数据时自动转义html标签
  • 原文地址:https://www.cnblogs.com/tangyuanjie/p/13992149.html
Copyright © 2011-2022 走看看