开发者

HOW-TO: Client connection manager for Boost::asio?

开发者 https://www.devze.com 2023-01-11 23:33 出处:网络
I created a server using boost:asio. When a client connects it sends a file_size, file_name and the file_data. The server stores this in a file on disk. This works perfectly! Though now I\'m running b

I created a server using boost:asio. When a client connects it sends a file_size, file_name and the file_data. The server stores this in a file on disk. This works perfectly! Though now I'm running both client application and server application in the main thread of their application (so I've got a server and client app) which blocks the rest of the application(s) from executing.

So in abstract I want to create something like this:

server app

  • have one thread to receive and handle all incoming file transfers
  • have another thread in which the rest of the application can do the things it want to

client app

  • when I press the space bar, or whenever i want, I want to send a file to the server in a separate thread from the main one so my application can continue doing other stuff it needs to do.

My question: how do I create a manager for my client file transfers?

File transfer server accepts new file transfer client connections

#include "ofxFileTransferServer.h"

ofxFileTransferServer::ofxFileTransferServer(unsigned short nPort)
    :acceptor(
        io_service
        ,boost::asio::ip::tcp::endpoint(
            boost::asio::ip::tcp::v4()
            ,nPort
        )
        ,tru开发者_JAVA技巧e
    )
    ,port(nPort)
{
}

// test
void ofxFileTransferServer::startThread() {
    boost::thread t(boost::bind(
        &ofxFileTransferServer::accept
        ,this
    ));
}


void ofxFileTransferServer::accept() {
    ofxFileTransferConnection::pointer new_connection(new ofxFileTransferConnection(io_service));
    acceptor.async_accept(
                    new_connection->socket()
                    ,boost::bind(
                        &ofxFileTransferServer::handleAccept
                        ,this
                        ,new_connection
                        ,boost::asio::placeholders::error
                    )
    );
    std::cout << __FUNCTION__ << " start accepting " << std::endl;
    io_service.run();
}


void ofxFileTransferServer::handleAccept(
            ofxFileTransferConnection::pointer pConnection
            ,const boost::system::error_code& rErr
)
{
    std::cout << __FUNCTION__ << " " << rErr << ", " << rErr.message() << std::endl;
    if(!rErr) {
        pConnection->start();
        ofxFileTransferConnection::pointer new_connection(new ofxFileTransferConnection(io_service));
        acceptor.async_accept(
                        new_connection->socket()
                        ,boost::bind(
                            &ofxFileTransferServer::handleAccept
                            ,this
                            ,new_connection
                            ,boost::asio::placeholders::error
                        )
        );


    }
}

File transfer client

#include "ofxFileTransferClient.h"
#include "ofMain.h"

using boost::asio::ip::tcp;

ofxFileTransferClient::ofxFileTransferClient(
                    boost::asio::io_service &rIOService
                    ,const std::string sServer
                    ,const std::string nPort
                    ,const std::string sFilePath  
):resolver_(rIOService)
,socket_(rIOService)
,file_path_(sFilePath)
,server_(sServer)
,port_(nPort)
{
}

ofxFileTransferClient::~ofxFileTransferClient() {
    std::cout << "~~~~ ofxFileTransferClient" << std::endl;
}

void ofxFileTransferClient::start() {
    // open file / get size
    source_file_stream_.open(
                    ofToDataPath(file_path_).c_str()
                    ,std::ios_base::binary | std::ios_base::ate
    );
    if(!source_file_stream_) {
        std::cout << ">> failed to open:" << file_path_ << std::endl;
        return;
    }

    size_t file_size = source_file_stream_.tellg();
    source_file_stream_.seekg(0);

    // send file size and name to server.
    std::ostream request_stream(&request_);

    request_stream  << file_path_ << "\n"
                    << file_size << "\n\n";

    std::cout   << ">> request_size:"   << request_.size() 
                << " file_path: " << file_path_
                << " file_size: "<< file_size
                << std::endl;

    // resolve ofxFileTransferServer
    tcp::resolver::query query(server_, port_);
    resolver_.async_resolve(
                query
                ,boost::bind(
                        &ofxFileTransferClient::handleResolve
                        ,shared_from_this()
                        ,boost::asio::placeholders::error
                        ,boost::asio::placeholders::iterator
                )
    );

}


void ofxFileTransferClient::handleResolve(
                const boost::system::error_code& rErr
                ,tcp::resolver::iterator oEndPointIt
)
{
    if(!rErr) {
        tcp::endpoint endpoint = *oEndPointIt;
        socket_.async_connect(
                endpoint
                ,boost::bind(
                        &ofxFileTransferClient::handleConnect
                        ,shared_from_this()
                        ,boost::asio::placeholders::error
                        ,++oEndPointIt
                )
        );
    }
    else {
        std::cout << ">> error: " << rErr.message() << std::endl;
    }

}   

void ofxFileTransferClient::handleConnect(
                const boost::system::error_code& rErr
                ,tcp::resolver::iterator oEndPointIt
)
{
    if(!rErr) {
        cout << ">> connected!" << std::endl;
        boost::asio::async_write(
                 socket_
                ,request_
                ,boost::bind(
                        &ofxFileTransferClient::handleFileWrite
                        ,shared_from_this()
                        ,boost::asio::placeholders::error
                )
        );
    }
    else if (oEndPointIt != tcp::resolver::iterator()) {
        // connection failed, try next endpoint in list
        socket_.close();
        tcp::endpoint endpoint = *oEndPointIt;
        socket_.async_connect(
            endpoint
            ,boost::bind(
                &ofxFileTransferClient::handleConnect
                ,shared_from_this()
                ,boost::asio::placeholders::error
                ,++oEndPointIt
            )
        );

    }
    else {
        std::cout << ">> error: " << rErr.message() << std::endl;
    }
}

void ofxFileTransferClient::handleFileWrite(
                const boost::system::error_code& rErr
)
{
    if(!rErr) {
        if(source_file_stream_.eof() == false) {
            source_file_stream_.read(buf_.c_array(), buf_.size());
            if(source_file_stream_.gcount() <= 0) {
                std::cout << ">> read file error." << std::endl;
                return;
            }
            std::cout << ">> send: " << source_file_stream_.gcount() << " bytes, total: " << source_file_stream_.tellg() << " bytes\n";
            boost::asio::async_write(
                    socket_
                    ,boost::asio::buffer(buf_.c_array(), source_file_stream_.gcount())
                    ,boost::bind(
                        &ofxFileTransferClient::handleFileWrite
                        ,this
                        ,boost::asio::placeholders::error
                    )
            );

            if(rErr) {
                std::cout <<">> send error: " << rErr << std::endl; // not sure bout this one..
            }

        }
        else {
            return; // eof()
        }
    }
    else {
        std::cout << ">> error:" << rErr.message() << std::endl;
    }
}

And a tiny manager to manager client transfers (which is used in the client app) Again the threading code is only for testing purposes and isnt used.

#include "ofxFileTransferManager.h"

ofxFileTransferManager::ofxFileTransferManager() { 
}

void ofxFileTransferManager::transferFile(
            const std::string sServer
            ,const std::string nPort
            ,const std::string sFile
)
{
    ofxFileTransferClient::pointer client(new ofxFileTransferClient(
        io_service_
        ,sServer
        ,nPort
        ,sFile
    ));
    client->start();
    io_service_.run();
}

void ofxFileTransferManager::startThread() {
    boost::thread t(boost::bind(
        &ofxFileTransferManager::run
        ,this
    ));
}

void ofxFileTransferManager::run() {
    cout << "starting filemanager" << std::endl;
    while(true) {
        io_service_.run();
        boost::this_thread::sleep(boost::posix_time::milliseconds(250)); 
        cout << ".";

    }
    cout << "ready filemanager" << std::endl;
}

It would be awesome if someone can help me out here. The example of boost all use a "one-time" client connection which doesn't really help me further.

roxlu


Great! I just figured it out. I had to wrap my io_service around a boost::asio::io_service::work object! (and forgot a shared_from_this()) somewhere. I've uploaded my code here: http://github.com/roxlu/ofxFileTransfer

For convenience here is the manager code:

#include "ofxFileTransferManager.h"



ofxFileTransferManager::ofxFileTransferManager()
:work_(io_service_)
{ 
}

void ofxFileTransferManager::transferFile(
            const std::string sServer
            ,const std::string nPort
            ,const std::string sFile
            ,const std::string sRemoteFile
)
{
    ofxFileTransferClient::pointer client(new ofxFileTransferClient(
        io_service_
        ,sServer
        ,nPort
        ,sFile
        ,sRemoteFile
    ));
    client->start();
}

void ofxFileTransferManager::startThread() {
    boost::thread t(boost::bind(
        &ofxFileTransferManager::run
        ,this
    ));
}

void ofxFileTransferManager::run() {
    io_service_.run();
}


From what I can tell, all you really need is to create a new thread and put in its main loop io_service.run();.

Obviously, you would have to take care of protecting classes and variables in mutexes that are shared between the appss main thread and asio's thread.

Edit: Something like this?

static sem_t __semSendFile;

static void* asioThread(void*)
{
    while( true )
    {
        sem_wait( &__semSendFile );
        io_service.run();
    }
    return NULL;
}

void ofxFileTransferManager::transferFile(
            const std::string sServer
            ,const std::string nPort
            ,const std::string sFile
)
{
    ofxFileTransferClient::pointer client(new ofxFileTransferClient(
        io_service_
        ,sServer
        ,nPort
        ,sFile
    ));
    client->start();
    sem_post( &__semSendFile );
}

int main(int argc, char **argv)
{
    if ( sem_init( &__semSendFile, 0, 0 ) != 0 )
    {
        std::cerr << strerror( errno ) << std::endl;
        return -1;
    }

    pthread_t thread;
    if ( pthread_create( &thread, NULL, asioThread, NULL ) != 0 )
    {
        std::cerr << strerror( errno ) << std::endl;
        return -1;
    }

 [...]
0

精彩评论

暂无评论...
验证码 换一张
取 消