zoukankan      html  css  js  c++  java
  • libevent简介[翻译]2 关于异步IO的简单介绍

    参考

    http://www.wangafu.net/~nickm/libevent-book/

    http://www.wangafu.net/~nickm/libevent-book/01_intro.html

    关于异步IO的一个简单介绍

    大多数初级开发者都是先接触到阻塞IO模型。如果你调用一个操作IO的函数,这个函数会一直等待,知道函数处理完成或是超时,那么这个就是同步IO操作。比如,当你调用 connect() 发起一个TCP连接,你的操作系统会发送一个SYN消息到目标机器的TCP连接队列中。这个函数会一直等待,直到目标机器返回一个SYN的确认消息或是超时,它才返回。

    这里有一个简单的同步IO操作的示例,它会连接www.google.com(也可以用www.baidu.com代替),发送一个简单的HTTP请求,把接收到的回复答应道终端。

    #include <netinet/in.h>
    #include <sys/socket.h>
    #include <netdb.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <string.h>
    int main(int c, char **v)
    {
            const char query[] = "GET / HTTP/1.0
    "
                    "Host: www.baidu.com
    "
                    "
    ";
            const char hostname[] = "www.baidu.com";
            struct sockaddr_in sin;
            struct hostent *h;
            const char *cp;
            int fd;
            ssize_t n_written, remaining;
            char buf[1024];
            h = gethostbyname(hostname);
            if(!h)
            {
                    fprintf(stderr, "couldn't lookup %s: %s", hostname, hstrerror(h_errno));
                    return 1;
            }
            if(h->h_addrtype != AF_INET)
            {
                    fprintf(stderr, "No ipv6 support sorry");
                    return 1;
            }
            fd = socket(AF_INET, SOCK_STREAM, 0);
            if(fd < 0)
            {
                    perror("socket");
                    return 1;
            }
            sin.sin_family = AF_INET;
            sin.sin_port = htons(80);
            sin.sin_addr = *(struct in_addr*)h->h_addr;
            if(connect(fd, (struct sockaddr*)&sin, sizeof(sin)))
            {
                    perror("connect");
                    close(fd);
                    return 1;
            }
            cp = query;
            remaining = strlen(query);
            while(remaining)
            {
                    n_written = send(fd, cp, remaining, 0);
                    if(n_written <= 0)
                    {
                            perror("send");
                            return 1;
                    }
                    remaining -= n_written;
                    cp += n_written;
            }
            while(true)
            {
                    ssize_t result = recv(fd, buf, sizeof(buf), 0);
                    if(result == 0)
                    {
                            break;
                    }
                    else if (result < 0)
                    {
                            perror("recv");
                            close(fd);
                            return 1;
                    }
                    fwrite(buf, 1, result, stdout);
            }
            close(fd);
            return 0;
    }

    输出

    HTTP/1.0 200 OK
    Accept-Ranges: bytes
    Cache-Control: no-cache
    Content-Length: 14615
    Content-Type: text/html
    Date: Thu, 04 Jun 2020 07:01:09 GMT
    P3p: CP=" OTI DSP COR IVA OUR IND COM "
    P3p: CP=" OTI DSP COR IVA OUR IND COM "
    Pragma: no-cache
    Server: BWS/1.1
    Set-Cookie: BAIDUID=18C45AC3EDD5442CD36799C508785696:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com
    Set-Cookie: BIDUPSID=18C45AC3EDD5442CD36799C508785696; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com
    Set-Cookie: PSTM=1591254069; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com
    Set-Cookie: BAIDUID=18C45AC3EDD5442C8E66F5E3B5DEB750:FG=1; max-age=31536000; expires=Fri, 04-Jun-21 07:01:09 GMT; domain=.baidu.com; path=/; version=1; comment=bd
    Traceid: 159125406905797109869177069905675153395
    Vary: Accept-Encoding
    X-Ua-Compatible: IE=Edge,chrome=1
    
    <!DOCTYPE html><!--STATUS OK-->
    <html>
    <head>
            <meta http-equiv="content-type" content="text/html;charset=utf-8">
            <meta http-equiv="X-UA-Compatible" content="IE=Edge">
            <link rel="dns-prefetch" href="//s1.bdstatic.com"/>
            <link rel="dns-prefetch" href="//t1.baidu.com"/>
            <link rel="dns-prefetch" href="//t2.baidu.com"/>
            <link rel="dns-prefetch" href="//t3.baidu.com"/>
            <link rel="dns-prefetch" href="//t10.baidu.com"/>
            <link rel="dns-prefetch" href="//t11.baidu.com"/>
            <link rel="dns-prefetch" href="//t12.baidu.com"/>
            <link rel="dns-prefetch" href="//b1.bdstatic.com"/>
            <title>百度一下,你就知道</title>
            <link href="http://s1.bdstatic.com/r/www/cache/static/home/css/index.css" rel="stylesheet" type="text/css" />
            <!--[if lte IE 8]><style index="index" >#content{height:480px9}#m{top:260px9}</style><![endif]-->
            <!--[if IE 8]><style index="index" >#u1 a.mnav,#u1 a.mnav:visited{font-family:simsun}</style><![endif]-->
            <script>var hashMatch = document.location.href.match(/#+(.*wd=[^&].+)/);if (hashMatch && hashMatch[0] && hashMatch[1]) {document.location.replace("http://"+location.host+"/s?"+hashMatch[1]);}var ns_c = function(){};</script>
            <script>function h(obj){obj.style.behavior='url(#default#homepage)';var a = obj.setHomePage('//www.baidu.com/');}</script>
            <noscript><meta http-equiv="refresh" content="0; url=/baidu.html?from=noscript"/></noscript>
            <script>window._ASYNC_START=new Date().getTime();</script>
    </head>
    <body link="#0000cc"><div id="wrapper" style="display:none;"><div id="u"><a href="//www.baidu.com/gaoji/preferences.html"  onmousedown="return user_c({'fm':'set','tab':'setting','login':'0'})">搜索设置</a>|<a id="btop" href="/"  onmousedown="return user_c({'fm':'set','tab':'index','login':'0'})">百度首页</a>|<a id="lb" href="https://passport.baidu.com/v2/?login&tpl=mn&u=http%3A%2F%2Fwww.baidu.com%2F" onclick="return false;"  onmousedown="return user_c({'fm':'set','tab':'login'})">登录</a><a href="https://passport.baidu.com/v2/?reg&regType=1&tpl=mn&u=http%3A%2F%2Fwww.baidu.com%2F"  onmousedown="return user_c({'fm':'set','tab':'reg'})" target="_blank" class="reg">注册</a></div><div id="head"><div class="s_nav"><a href="/" class="s_logo" onmousedown="return c({'fm':'tab','tab':'logo'})"><img src="//www.baidu.com/img/baidu_jgylogo3.gif" width="117" height="38" border="0" alt="到百度首页" title="到百度首页"></a><div class="s_tab" id="s_tab"><a href="http://news.baidu.com/ns?cl=2&rn=20&tn=news&word=" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'news'})">新闻</a>&#12288;<b>网页</b>&#12288;<a href="http://tieba.baidu.com/f?kw=&fr=wwwt" wdfield="kw"  onmousedown="return c({'fm':'tab','tab':'tieba'})">贴吧</a>&#12288;<a href="http://zhidao.baidu.com/q?ct=17&pn=0&tn=ikaslist&rn=10&word=&fr=wwwt" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'zhidao'})">知道</a>&#12288;<a href="http://music.baidu.com/search?fr=ps&key=" wdfield="key"  onmousedown="return c({'fm':'tab','tab':'music'})">音乐</a>&#12288;<a href="http://image.baidu.com/i?tn=baiduimage&ps=1&ct=201326592&lm=-1&cl=2&nc=1&word=" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'pic'})">图片</a>&#12288;<a href="http://v.baidu.com/v?ct=301989888&rn=20&pn=0&db=0&s=25&word=" wdfield="word"   onmousedown="return c({'fm':'tab','tab':'video'})">视频</a>&#12288;<a href="http://map.baidu.com/m?word=&fr=ps01000" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'map'})">地图</a>&#12288;<a href="http://wenku.baidu.com/search?word=&lm=0&od=0" wdfield="word"  onmousedown="return c({'fm':'tab','tab':'wenku'})">文库</a>&#12288;<a href="//www.baidu.com/more/"  onmousedown="return c({'fm':'tab','tab':'more'})">更多»</a></div></div><form id="form" name="f" action="/s" class="fm" ><input type="hidden" name="ie" value="utf-8"><input type="hidden" name="f" value="8"><input type="hidden" name="rsv_bp" value="1"><span class="bg s_ipt_wr"><input name="wd" id="kw" class="s_ipt" value="" maxlength="100"></span><span class="bg s_btn_wr"><input type="submit" id="su" value="百度一下" class="bg s_btn" onmousedown="this.className='bg s_btn s_btn_h'" onmouseout="this.className='bg s_btn'"></span><span class="tools"><span id="mHolder"><div id="mCon"><span>输入法</span></div><ul id="mMenu"><li><a href="javascript:;" name="ime_hw">手写</a></li><li><a href="javascript:;" name="ime_py">拼音</a></li><li class="ln"></li><li><a href="javascript:;" name="ime_cl">关闭</a></li></ul></span><span class="shouji"><strong>推荐&nbsp;:&nbsp;</strong><a href="http://w.x.baidu.com/go/mini/8/10000020" onmousedown="return ns_c({'fm':'behs','tab':'bdbrowser'})">百度浏览器,打开网页快2秒!</a></span></span></form></div><div id="content"><div id="u1"><a href="http://news.baidu.com" name="tj_trnews" class="mnav">新闻</a><a href="http://www.hao123.com" name="tj_trhao123" class="mnav">hao123</a><a href="http://map.baidu.com" name="tj_trmap" class="mnav">地图</a><a href="http://v.baidu.com" name="tj_trvideo" class="mnav">视频</a><a href="http://tieba.baidu.com" name="tj_trtieba" class="mnav">贴吧</a><a href="https://passport.baidu.com/v2/?login&tpl=mn&u=http%3A%2F%2Fwww.baidu.com%2F" name="tj_login" id="lb" onclick="return false;">登录</a><a href="//www.baidu.com/gaoji/preferences.html" name="tj_settingicon" id="pf">设置</a><a href="//www.baidu.com/more/" name="tj_briicon" id="bri">更多产品</a></div><div id="m"><p id="lg"><img src="//www.baidu.com/img/bd_logo.png" width="270" height="129"></p><p id="nv"><a href="http://news.baidu.com">新&nbsp;闻</a> <b>网&nbsp;页</b> <a href="http://tieba.baidu.com">贴&nbsp;吧</a> <a href="http://zhidao.baidu.com">知&nbsp;道</a> <a href="http://music.baidu.com">音&nbsp;乐</a> <a href="http://image.baidu.com">图&nbsp;片</a> <a href="http://v.baidu.com">视&nbsp;频</a> <a href="http://map.baidu.com">地&nbsp;图</a></p><div id="fm"><form id="form1" name="f1" action="/s" class="fm"><span class="bg s_ipt_wr"><input type="text" name="wd" id="kw1" maxlength="100" class="s_ipt"></span><input type="hidden" name="rsv_bp" value="0"><input type=hidden name=ch value=""><input type=hidden name=tn value="baidu"><input type=hidden name=bar value=""><input type="hidden" name="rsv_spt" value="3"><input type="hidden" name="ie" value="utf-8"><span class="bg s_btn_wr"><input type="submit" value="百度一下" id="su1" class="bg s_btn" onmousedown="this.className='bg s_btn s_btn_h'" onmouseout="this.className='bg s_btn'"></span></form><span class="tools"><span id="mHolder1"><div id="mCon1"><span>输入法</span></div></span></span><ul id="mMenu1"><div class="mMenu1-tip-arrow"><em></em><ins></ins></div><li><a href="javascript:;" name="ime_hw">手写</a></li><li><a href="javascript:;" name="ime_py">拼音</a></li><li class="ln"></li><li><a href="javascript:;" name="ime_cl">关闭</a></li></ul></div><p id="lk"><a href="http://baike.baidu.com">百科</a> <a href="http://wenku.baidu.com">文库</a> <a href="http://www.hao123.com">hao123</a><span>&nbsp;|&nbsp;<a href="//www.baidu.com/more/">更多&gt;&gt;</a></span></p><p id="lm"></p></div></div><div id="ftCon"><div id="ftConw"><p id="lh"><a id="seth" onClick="h(this)" href="/" onmousedown="return ns_c({'fm':'behs','tab':'homepage','pos':0})">把百度设为主页</a><a id="setf" href="//www.baidu.com/cache/sethelp/index.html" onmousedown="return ns_c({'fm':'behs','tab':'favorites','pos':0})" target="_blank">把百度设为主页</a><a onmousedown="return ns_c({'fm':'behs','tab':'tj_about'})" href="http://home.baidu.com">关于百度</a><a onmousedown="return ns_c({'fm':'behs','tab':'tj_about_en'})" href="http://ir.baidu.com">About Baidu</a></p><p id="cp">&copy;2018&nbsp;Baidu&nbsp;<a href="/duty/" name="tj_duty">使用百度前必读</a>&nbsp;京ICP证030173号&nbsp;<img src="http://s1.bdstatic.com/r/www/cache/static/global/img/gs_237f015b.gif"></p></div></div><div id="wrapper_wrapper"></div></div><div class="c-tips-container" id="c-tips-container"></div>
    <script>window.__async_strategy=2;</script>
    <script>var bds={se:{},su:{urdata:[],urSendClick:function(){}},util:{},use:{},comm : {domain:"http://www.baidu.com",ubsurl : "http://sclick.baidu.com/w.gif",tn:"baidu",queryEnc:"",queryId:"",inter:"",templateName:"baidu",sugHost : "http://suggestion.baidu.com/su",query : "",qid : "",cid : "",sid : "",indexSid : "",stoken : "",serverTime : "",user : "",username : "",loginAction : [],useFavo : "",pinyin : "",favoOn : "",curResultNum:"",rightResultExist:false,protectNum:0,zxlNum:0,pageNum:1,pageSize:10,newindex:0,async:1,maxPreloadThread:5,maxPreloadTimes:10,preloadMouseMoveDistance:5,switchAddMask:false,isDebug:false,ishome : 1},_base64:{domain : "http://b1.bdstatic.com/",b64Exp : -1,pdc : 0}};var name,navigate,al_arr=[];var selfOpen = window.open;eval("var open = selfOpen;");var isIE=navigator.userAgent.indexOf("MSIE")!=-1&&!window.opera;var E = bds.ecom= {};bds.se.mon = {'loadedItems':[],'load':function(){},'srvt':-1};try {bds.se.mon.srvt = parseInt(document.cookie.match(new RegExp("(^| )BDSVRTM=([^;]*)(;|$)"))[2]);document.cookie="BDSVRTM=;expires=Sat, 01 Jan 2000 00:00:00 GMT"; }catch(e){}</script>
    <script>if(!location.hash.match(/[^a-zA-Z0-9]wd=/)){document.getElementById("ftCon").style.display='block';document.getElementById("u1").style.display='block';document.getElementById("content").style.display='block';document.getElementById("wrapper").style.display='block';setTimeout(function(){try{document.getElementById("kw1").focus();document.getElementById("kw1").parentNode.className += ' iptfocus';}catch(e){}},0);}</script>
    <script type="text/javascript" src="http://s1.bdstatic.com/r/www/cache/static/jquery/jquery-1.10.2.min_f2fb5194.js"></script>
    <script>(function(){var index_content = $('#content');var index_foot= $('#ftCon');var index_css= $('head [index]');var index_u= $('#u1');var result_u= $('#u');var wrapper=$("#wrapper");window.index_on=function(){index_css.insertAfter("meta:eq(0)");result_common_css.remove();result_aladdin_css.remove();result_sug_css.remove();index_content.show();index_foot.show();index_u.show();result_u.hide();wrapper.show();if(bds.su&&bds.su.U&&bds.su.U.homeInit){bds.su.U.homeInit();}setTimeout(function(){try{$('#kw1').get(0).focus();window.sugIndex.start();}catch(e){}},0);if(typeof initIndex=='function'){initIndex();}};window.index_off=function(){index_css.remove();index_content.hide();index_foot.hide();index_u.hide();result_u.show();result_aladdin_css.insertAfter("meta:eq(0)");result_common_css.insertAfter("meta:eq(0)");result_sug_css.insertAfter("meta:eq(0)");wrapper.show();};})();</script>
    <script>window.__switch_add_mask=1;</script>
    <script type="text/javascript" src="http://s1.bdstatic.com/r/www/cache/static/global/js/instant_search_newi_redirect1_20bf4036.js"></script>
    <script>initPreload();$("#u,#u1").delegate("#lb",'click',function(){try{bds.se.login.open();}catch(e){}});if(navigator.cookieEnabled){document.cookie="NOJS=;expires=Sat, 01 Jan 2000 00:00:00 GMT";}</script>
    <script>$(function(){for(i=0;i<3;i++){u($($('.s_ipt_wr')[i]),$($('.s_ipt')[i]),$($('.s_btn_wr')[i]),$($('.s_btn')[i]));}function u(iptwr,ipt,btnwr,btn){if(iptwr && ipt){iptwr.on('mouseover',function(){iptwr.addClass('ipthover');}).on('mouseout',function(){iptwr.removeClass('ipthover');}).on('click',function(){ipt.focus();});ipt.on('focus',function(){iptwr.addClass('iptfocus');}).on('blur',function(){iptwr.removeClass('iptfocus');}).on('render',function(e){var $s = iptwr.parent().find('.bdsug');var l = $s.find('li').length;if(l>=5){$s.addClass('bdsugbg');}else{$s.removeClass('bdsugbg');}});}if(btnwr && btn){btnwr.on('mouseover',function(){btn.addClass('btnhover');}).on('mouseout',function(){btn.removeClass('btnhover');});}}});</script>
    <script type="text/javascript" src="http://s1.bdstatic.com/r/www/cache/static/home/js/bri_7f1fa703.js"></script>
    <script>(function(){var _init=false;window.initIndex=function(){if(_init){return;}_init=true;var w=window,d=document,n=navigator,k=d.f1.wd,a=d.getElementById("nv").getElementsByTagName("a"),isIE=n.userAgent.indexOf("MSIE")!=-1&&!window.opera;(function(){if(/q=([^&]+)/.test(location.search)){k.value=decodeURIComponent(RegExp["x241"])}})();(function(){var u = G("u1").getElementsByTagName("a"), nv = G("nv").getElementsByTagName("a"), lk = G("lk").getElementsByTagName("a"), un = "";var tj_nv = ["news","tieba","zhidao","mp3","img","video","map"];var tj_lk = ["baike","wenku","hao123","more"];un = bds.comm.user == "" ? "" : bds.comm.user;function _addTJ(obj){addEV(obj, "mousedown", function(e){var e = e || window.event;var target = e.target || e.srcElement;if(target.name){ns_c({'fm':'behs','tab':target.name,'un':encodeURIComponent(un)});}});}for(var i = 0; i < u.length; i++){_addTJ(u[i]);}for(var i = 0; i < nv.length; i++){nv[i].name = 'tj_' + tj_nv[i];}for(var i = 0; i < lk.length; i++){lk[i].name = 'tj_' + tj_lk[i];}})();(function() {var links = {'tj_news': ['word', 'http://news.baidu.com/ns?tn=news&cl=2&rn=20&ct=1&ie=utf-8'],'tj_tieba': ['kw', 'http://tieba.baidu.com/f?ie=utf-8'],'tj_zhidao': ['word', 'http://zhidao.baidu.com/search?pn=0&rn=10&lm=0'],'tj_mp3': ['key', 'http://music.baidu.com/search?fr=ps&ie=utf-8'],'tj_img': ['word', 'http://image.baidu.com/i?ct=201326592&cl=2&nc=1&lm=-1&st=-1&tn=baiduimage&istype=2&fm=&pv=&z=0&ie=utf-8'],'tj_video': ['word', 'http://video.baidu.com/v?ct=301989888&s=25&ie=utf-8'],'tj_map': ['wd', 'http://map.baidu.com/?newmap=1&ie=utf-8&s=s'],'tj_baike': ['word', 'http://baike.baidu.com/search/word?pic=1&sug=1&enc=utf8'],'tj_wenku': ['word', 'http://wenku.baidu.com/search?ie=utf-8']};var domArr = [G('nv'), G('lk'),G('cp')],kw = G('kw1');for (var i = 0, l = domArr.length; i < l; i++) {domArr[i].onmousedown = function(e) {e = e || window.event;var target = e.target || e.srcElement,name = target.getAttribute('name'),items = links[name],reg = new RegExp('^\s+|\s+x24'),key = kw.value.replace(reg, '');if (items) {if (key.length > 0) {var wd = items[0], url = items[1],url = url + ( name === 'tj_map' ? encodeURIComponent('&' + wd + '=' + key) : ( ( url.indexOf('?') > 0 ? '&' : '?' ) + wd + '=' + encodeURIComponent(key) ) );target.href = url;} else {target.href = target.href.match(new RegExp('^http://.+.baidu.com'))[0];}}name && ns_c({'fm': 'behs','tab': name,'query': encodeURIComponent(key),'un': encodeURIComponent(bds.comm.user || '') });};}})();};if(window.pageState==0){initIndex();}})();document.cookie = 'IS_STATIC=1;expires=' + new Date(new Date().getTime() + 10*60*1000).toGMTString();</script>
    </body></html>

    所有上面示例中使用的API都是同步的,也就是阻塞的。 gethostbyname 这个函数,必须解析到了www.baidu.com的IP或是失败,才能返回; connect 必须连接上才能返回; recv 必须接收到数据或是一个关闭信号才能返回, send 必须是吧数据发送到了内核的缓冲区才能返回。

    当然,阻塞的IO请求并不是什么坏事,如果你的程序在等待API完成的过程中,并不需要做任何其他的事情,那么阻塞IO操作可以工作的非常棒。但是,如果你需要写一个程序,在同一时间操作更多的IO,更具体一点,你需要同时从两个连接中读取数据,但是,并不知道哪个连接的数据先到达,你可能写成下面糟糕的样子

    char buf[1024];
    int i, n;
    while (i_still_want_to_read()) {
        for (i=0; i<n_sockets; ++i) {
            n = recv(fd[i], buf, sizeof(buf), 0);
            if (n==0)
                handle_close(fd[i]);
            else if (n<0)
                handle_error(fd[i], errno);
            else
                handle_input(fd[i], buf, n);
        }
    }

    如果fd[2]的数据到达了,这个程序并不会去读,必须要等fd[0]和fd[1]读数据的操作完成后,才会读fd[2];如果fd[0]或是fd[1]一直没有数据到达,那么fd[2]也会一直阻塞,无法读取数据。

    对于这种情况,有些情况下,人们通过多线程或是多进程来解决,最简单的方法就是每一个线程处理一个连接。任何一个连接的阻塞并不会影响其他的连接。

    我们这有一个例子,把读取的数据做一些转换再发送给客户端,这里通过fork为每一个连接创建一个新的进程

    #include <netinet/in.h>
    #include <sys/socket.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <iostream>
    using namespace std;
    #define MAX_LINE 16384
    char rot13_char(char c)
    {
            if((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
                    return c+ 13;
            else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
                    return c - 13;
            else
                    return c;
    }
    void child(int fd)
    {
            char outbuf[MAX_LINE+1] = {0};
            size_t outbuf_used = 0;
            ssize_t result;
            while(true)
            {
                    char ch;
                    result = recv(fd, &ch, 1, 0);
                    if(result == 0)
                            break;
                    else if(result == -1)
                    {
                            perror("read");
                            break;
                    }
                    if(outbuf_used < sizeof(outbuf))
                    {
                            outbuf_used++;
                            outbuf[outbuf_used] = rot13_char(ch);
                    }
                    if(ch == 'q')
                    {
                            send(fd, outbuf, outbuf_used, 0);
                            outbuf_used = 0;
                            continue;
                    }
            }
    }
    void run(void)
    {
            int listener;
            struct sockaddr_in sin;
            sin.sin_family = AF_INET;
            sin.sin_addr.s_addr = 0;
            sin.sin_port = htons(40713);
            listener = socket(AF_INET, SOCK_STREAM, 0);
            if(bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0)
            {
                    perror("bind");
                    return;
            }
            if(listen(listener, 16)<0)
            {
                    perror("listener");
                    return;
            }
            while(true)
            {
                    struct sockaddr_storage ss;
                    socklen_t slen = sizeof(ss);
                    int fd = accept(listener, (struct sockaddr*)&ss, &slen);
                    if(fd < 0)
                    {
                            perror("accept");
                    }
                    else
                    {
                            int pid = fork();
                            cout << pid << endl;
                            if(pid==0)
                            {
                                    child(fd);
                                    exit(0);
                            }
                    }
    
            }
    }
    int main(int c, char **v)
    {
            run();
            return 0;
    }

    我们是不是可以用这个完美的方案解决同时处理多个连接的问题了?是不是我就可以去干其他的事情了,不需要再写这本书了?(ps:网站作者原话翻译)等等,首先有一点需要知道,创建进程,甚至是线程,在一些平台上对资源的消耗是很昂贵的。现实中,我们会使用线程池来代替创建进程,这样就可以减少创建和释放消耗的资源。但是呢,线程的表现也没有达到我们理想的那样。如果我们的程序需要处理成千上万的连接请求时,成千上万的连接同时处理与一个cpu只处理几个连接请求,并没有特别大的区别。也就是一个cpu处理几个线程与非常多的线程,不会有更高的效率。cpu处理线程的效率并不会随着线程的增多而线性增加,由于线程间上下文切换,反而效率会降低。

    如果多线程也不能解决处理大量连接的问题,那什么可以呢?在Unix环境下,把你的socket设置成非阻塞的,可以调用下面的API来实现:

    fcntl(fd, F_SETFL, O_NONBLOCK);

    fd就是socket的文件描述符

    一个文件描述符,就是一个数字,内核把这个数字与我们打开的socket绑定到一起。使用这个数字,调用Unix的API,系统就会自动的处理对应的socket。

    一旦我们把fd(也就是这个socket)设置为了非阻塞的,从现在起,每次网络调用,都会直接完成,或是返回一个特定的错误,告诉你,我现在什么也做不了,一会再来吧。我们那个两个socket的例子就可以写成下面的样子

    /* This will work, but the performance will be unforgivably bad. */
    int i, n;
    char buf[1024];
    for (i=0; i < n_sockets; ++i)
        fcntl(fd[i], F_SETFL, O_NONBLOCK);
    
    while (i_still_want_to_read()) {
        for (i=0; i < n_sockets; ++i) {
            n = recv(fd[i], buf, sizeof(buf), 0);
            if (n == 0) {
                handle_close(fd[i]);
            } else if (n < 0) {
                if (errno == EAGAIN)
                     ; /* The kernel didn't have any data for us to read. */
                else
                     handle_error(fd[i], errno);
             } else {
                handle_input(fd[i], buf, n);
             }
        }
    }

    上面已经说了,这样做性能很差,正常不会这样写,这里仅仅是一个示例,告诉我们可以用非阻塞的IO来实现一个线程操作多个socket,并且不会出现,前一个socket没有可操作的数据,阻塞在那里,就算后面的有了可操作的数据,也必须等待的问题。

    这里性能差的地方有两点,一个是如果没有任何IO操作,这个循环会不断的执行下去,占用了CPU资源,与死循环类似。二是,不管每个socket是否有数据可以操作,我们都必须执行一个调用系统内核的函数,就算没有任何操作,这次调用也会消耗资源。我们想要的是,我们告诉内核,如果有可操作的socket,告诉我们是哪几个socket可以操作了。

    最早时期,人们解决这个问题用的是 selectselect 调用返回的时候,会给定一个socket的3个状态,可读、可写和可执行。这个调用会阻塞在那,如果有任何一个其监测的文件描述符可以操作了,它就会把它放到一个可用的列表中。这个列表中都是这种可以操作的文件描述符。

    这里有一个使用 select 的示例

    /* If you only have a couple dozen fds, this version won't be awful */
    fd_set readset;
    int i, n;
    char buf[1024];
    
    while (i_still_want_to_read()) {
        int maxfd = -1;
        FD_ZERO(&readset);
    
        /* Add all of the interesting fds to readset */
        for (i=0; i < n_sockets; ++i) {
             if (fd[i]>maxfd) maxfd = fd[i];
             FD_SET(fd[i], &readset);
        }
    
        /* Wait until one or more fds are ready to read */
        select(maxfd+1, &readset, NULL, NULL, NULL);
    
        /* Process all of the fds that are still set in readset */
        for (i=0; i < n_sockets; ++i) {
            if (FD_ISSET(fd[i], &readset)) {
                n = recv(fd[i], buf, sizeof(buf), 0);
                if (n == 0) {
                    handle_close(fd[i]);
                } else if (n < 0) {
                    if (errno == EAGAIN)
                         ; /* The kernel didn't have any data for us to read. */
                    else
                         handle_error(fd[i], errno);
                 } else {
                    handle_input(fd[i], buf, n);
                 }
            }
        }
    }

    这个操作就是把一个文件描述符加入到监听的set中,然后调用 select ,这个调用会一直阻塞在那里,直到有至少一个set中的文件描述符可以读了它才返回,并且把所有可读的文件描述符放到一个set中,然后我们遍历这个返回的set,一个个的读取数据。

    对于我们一开始的程序,可以用select实现一下

    #include <netinet/in.h>
    #include <sys/socket.h>
    #include <fcntl.h>
    #include <sys/select.h>
    #include <assert.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <errno.h>
    #define MAX_LINE 16384
    char rot13_char(char c)
    {
        /* We don't want to use isalpha here; setting the locale would change
         * which characters are considered alphabetical. */
        if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
            return c + 13;
        else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
            return c - 13;
        else
            return c;
    }
    
    struct fd_state{
            char buffer [MAX_LINE] = {0};
            size_t buffer_used;
            int writing;
            size_t n_written;
            size_t write_upto;
    };
    
    struct fd_state * alloc_fd_state()
    {
            struct fd_state * state = (struct fd_state*)malloc(sizeof(struct fd_state));
            if(!state)
                    return NULL;
            state->buffer_used = state->n_written = state->writing = state->write_upto = 0;
            return state;
    }
    
    void free_fd_state(struct fd_state * state)
    {
            free(state);
    }
    
    void make_nonblocking(int fd)
    {
            fcntl(fd, F_SETFL, O_NONBLOCK);
    }
    
    int do_read(int fd, struct fd_state *state)
    {
            char buf[1024] = {0};
            int i;
            ssize_t result;
            while(true)
            {
                    result = recv(fd, buf, sizeof(buf), 0);
                    if(result <= 0)
                            break;
                    for(i = 0; i < result; ++i)
                    {
                            if(state->buffer_used < sizeof(state->buffer))
                                    state->buffer[state->buffer_used++] = rot13_char(buf[i]);
                            if(buf[i] == 'q')
                            {
                                    state->writing = 1;
                                    state->write_upto = state->buffer_used;
                            }
                    }
            }
            if(result == 0)
                    return 1;
            else if(result < 0)
            {
                    if(errno == EAGAIN)
                            return 0;
                    return -1;
            }
            return 0;
    }
    
    int do_write(int fd, struct fd_state * state)
    {
            while(state->n_written < state->write_upto)
            {
                    ssize_t result = send(fd, state->buffer + state->n_written,
                                    state->write_upto - state->n_written, 0);
                    if(result < 0)
                    {
                            if(errno == EAGAIN)
                                    return 0;
                            return -1;
                    }
                    assert(result != 0);
                    state->n_written += result;
            }
            if(state->n_written == state->buffer_used)
                    state->n_written = state->write_upto = state->buffer_used = 0;
            state->writing = 0;
            return 0;
    }
    void run()
    {
            int listener;
            struct fd_state * state[FD_SETSIZE];
            struct sockaddr_in sin;
            int i, maxfd;
            fd_set readset, writeset, exset;
            sin.sin_family = AF_INET;
            sin.sin_addr.s_addr = 0;
            sin.sin_port = htons(40713);
            for(i = 0; i < FD_SETSIZE; i++)
                    state[i] = NULL;
            listener = socket(AF_INET, SOCK_STREAM, 0);
            make_nonblocking(listener);
            if(bind(listener, (struct sockaddr*)&sin, sizeof(sin))<0)
                    return;
            if(listen(listener, 16)<0)
                    return;
            FD_ZERO(&readset);
            FD_ZERO(&writeset);
            FD_ZERO(&exset);
            while(true)
            {
                    maxfd = listener;
                    FD_ZERO(&readset);
                    FD_ZERO(&writeset);
                    FD_ZERO(&exset);
                    FD_SET(listener, &readset);
                    for(i = 0; i < FD_SETSIZE; i++)
                    {
                            if(state[i]){
                                    if(i > maxfd)
                                            maxfd = i;
                                    FD_SET(i, &readset);
                                    if(state[i]->writing)
                                    {
                                            FD_SET(i, &writeset);
                                    }
                            }
                    }
                    if(select(maxfd+1, &readset, &writeset, &exset, NULL) < 0)
                    {
                            return;
                    }
                    if(FD_ISSET(listener, &readset))
                    {
                            struct sockaddr_storage ss;
                            socklen_t slen = sizeof(ss);
                            int fd = accept(listener, (struct sockaddr*)&ss, &slen);
                            if(fd < 0)
                                    perror("accept");
                            else if (fd > FD_SETSIZE)
                                    close(fd);
                            else
                            {
                                    make_nonblocking(fd);
                                    state[fd] = alloc_fd_state();
                                    assert(state[fd]);
                            }
                    }
                    for(i = 0; i < maxfd+1; i++)
                    {
                            int r = 0;
                            if(i == listener)
                                    continue;
                            if(FD_ISSET(i, &readset))
                            {
                                    r = do_read(i, state[i]);
                            }
                            if(r == 0 && FD_ISSET(i, &writeset))
                            {
                                    r = do_write(i, state[i]);
                            }
                            if(r)
                            {
                                    free_fd_state(state[i]);
                                    state[i] = NULL;
                                    close(i);
                            }
                    }
            }
    }
    int main(int c, char **v)
    {
            setvbuf(stdout, NULL, _IONBF, 0);
            run();
            return 0;
    }

    上面就是一个简单的 select 模型的代码。主要的思路就是,创建一个数组用来判断socket的状态,这个数组大小是固定的,由 FD_SETSIZE 宏确定。然后初始化一个set,把set放到监听的set列表中。调用 select ,当有我们想监听的操作,读、写、异常触发时,select就会返回,顺带填充好读、写、异常的数组。在读数组中的socket都是可读的,在写数组中的socket都是可写的,在异常数组中的socket都是执行过程中遇到了一些问题需要处理的。我们一个个判断所有的socket是否在上面的任何一个数组中。如果在,就执行对应的操作。

    不过到这里呢,我们还没有结束。因为创建和遍历 select 返回的数组,需要消耗大量的资源,尤其是随着监听的socket变多, select 的性能会表现的非常差。[虽然从用户层面来看,我们的操作是线性的,有几个可读的,我们就去读这些数据就好了,感觉上没什么资源的浪费,但是对于内核来说,不管我们有几个可以操作的socket,内核都要全部遍历一遍,因为内核自己不清楚到底哪个socket可以操作,只有遍历判断之后才能知道,这个是非常消耗性能的,并且随着socket的增多,性能会不断下降,因为同一时间可操作的socket并没有不可操做的socket多,那么对其他现在不需要操作的socket判断遍历就是在浪费资源]

    不同的系统平台,提供了不同的用来代替 select 的方法,比如 poll()  epoll()  kqueue  evports 和 /dev/poll ,所有的这些API除了 poll() 之外,都可以提供更好的性能操作,对于添加socket,删除socket和通知socket都有着O(1)效率。

    但是很不幸的是,这些性能更好的方法并不通用。Linux中是用 epoll() 实现的,BSD包括Darwin系统用的是 kqueue() ,Solaris用的是 evports() 和 /dev/poll 。每一个系统中,都没有其他系统提供的方法。如果我们想开发一个共行能异步的跨平台程序,就需要抽象出这部分内容,对于不同平台进行不同的封装实现。

    这也就是libevent底层所做的工作。libevent提供了统一的接口,用来调用代替 select() 那些方法,在一个系统环境上,libevent会提供最有效的解决方案。

    对于我们上面的程序,这里就有另外一种方法来实现了,用libevent来代替 select 。 fd_sets 就不需要了,我们用libevent里面的 event_base 来关联socket的事件,底层实现可以是 select()  poll()  epoll()  kqueue() 等等。

    #include <netinet/in.h>
    #include <sys/socket.h>
    #include <fcntl.h>
    #include <sys/select.h>
    #include <event2/event.h>
    #include <assert.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <errno.h>
    #define MAX_LINE 16384
    void do_read(evutil_socket_t fd, short events, void *arg);
    void do_write(evutil_socket_t fd, short events, void * arg);
    
    char rot13_char(char c)
    {
        /* We don't want to use isalpha here; setting the locale would change
         * which characters are considered alphabetical. */
        if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
            return c + 13;
        else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
            return c - 13;
        else
            return c;
    }
    
    struct fd_state{
            char buffer [MAX_LINE] = {0};
            size_t buffer_used;
    
            size_t n_written;
            size_t write_upto;
            struct event *read_event;
            struct event *write_event;
    };
    
    struct fd_state * alloc_fd_state(struct event_base* base, evutil_socket_t fd)
    {
            struct fd_state * state = (struct fd_state*)malloc(sizeof(struct fd_state));
            if(!state)
                    return NULL;
            state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
            if(!state->read_event)
            {
                    free(state);
                    return NULL;
            }
            state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);
            if(!state->write_event)
            {
                    event_free(state->read_event);
                    free(state);
                    return NULL;
            }
            state->n_written = state->write_upto = state->buffer_used = 0;
            assert(state->write_event);
            return state;
    }
    
    void free_fd_state(struct fd_state * state)
    {
            event_free(state->read_event);
            event_free(state->write_event);
            free(state);
    }
    
    void make_nonblocking(int fd)
    {
            fcntl(fd, F_SETFL, O_NONBLOCK);
    }
    
    void do_read(evutil_socket_t fd, short events, void* arg)
    {
            struct fd_state *state = (struct fd_state *)arg;
            char buf[1024] = {0};
            int i;
            ssize_t result;
            while(true)
            {
                    assert(state->write_event);
                    result = recv(fd, buf, sizeof(buf), 0);
                    if(result <= 0)
                            break;
                    for(i = 0; i < result; ++i)
                    {
                            if(state->buffer_used < sizeof(state->buffer))
                                    state->buffer[state->buffer_used++] = rot13_char(buf[i]);
                            if(buf[i] == 'q')
                            {
                                    assert(state->write_event);
                                    event_add(state->write_event, NULL);
                                    state->write_upto = state->buffer_used;
                            }
                    }
            }
            if(result == 0)
                    free_fd_state(state);
            else if(result < 0)
            {
                    if(errno == EAGAIN)
                            return;
                    perror("recv");
                    free_fd_state(state);
            }
    
    }
    
    void do_write(evutil_socket_t fd, short events, void * arg)
    {
            struct fd_state *state = (struct fd_state *)arg;
            while(state->n_written < state->write_upto)
            {
                    ssize_t result = send(fd, state->buffer + state->n_written,
                                    state->write_upto - state->n_written, 0);
                    if(result < 0)
                    {
                            if(errno == EAGAIN)
                                    return;
                            free_fd_state(state);
                            return;
                    }
                    assert(result != 0);
                    state->n_written += result;
            }
            if(state->n_written == state->buffer_used)
                    state->n_written = state->write_upto = state->buffer_used = 0;
            event_del(state->write_event);
    }
    void do_accept(evutil_socket_t listener, short event, void *arg)
    {
            struct event_base* base = (struct event_base*) arg;
            struct sockaddr_storage ss;
            socklen_t slen = sizeof(ss);
            int fd = accept(listener, (struct sockaddr*)&ss, &slen);
            if(fd < 0)
                    perror("accept");
            else if(fd > FD_SETSIZE)
                    close(fd);
            else
            {
                    struct fd_state * state;
                    evutil_make_socket_nonblocking(fd);
                    state = alloc_fd_state(base, fd);
                    assert(state);
                    assert(state->write_event);
                    event_add(state->read_event, NULL);
            }
    }
    
    void run()
    {
            evutil_socket_t listener;
            struct sockaddr_in sin;
            struct event_base *base;
            struct event *listener_event;
            base = event_base_new();
            if(!base)
                    return;
    
            sin.sin_family = AF_INET;
            sin.sin_addr.s_addr = 0;
            sin.sin_port = htons(40713);
            listener = socket(AF_INET, SOCK_STREAM, 0);
            evutil_make_socket_nonblocking(listener);
            if(bind(listener, (struct sockaddr*)&sin, sizeof(sin))<0)
                    return;
            if(listen(listener, 16)<0)
                    return;
            listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
            event_add(listener_event, NULL);
            event_base_dispatch(base);
    
    }
    int main(int c, char **v)
    {
            setvbuf(stdout, NULL, _IONBF, 0);
            run();
            return 0;
    }

    编译的时候加上libevent的库

    $ g++ test.cpp -levent_core

    使用libevent后,就达到了我们的要求了。我们创建一个base event,然后对于每一个socket,都创建一个event,把我们需要监听的消息设置进去,然后丢给libevent就可以了,设置好回调函数,我们关心的事件触发时会直接回调我们的函数,这样,不用轮询,也不用遍历,直接等待消息过来就可以了。

    在这里我们用 evutil_socket_t 代替了 int ,用 evutil_make_socket_nonblocking 代替了 fcntl(O_NONBLOCK) ,这样做的目的是为了兼容Win32网络编程下不通用的接口。

    便捷性怎么样?或者说Windows平台下怎么样?

    我们可以看到,我们的代码性能更好了,也更复杂了。我们一开始用 fork() 实现的时候,根本不需要管理buffer,每个进程申请一块内存,我们不用关心每个socket是在读还是在写。我们不用关系每次操作了多少数据,我们只需要重复的申请释放buffer就可以了。

    如果你对Windows下的网络编程非常熟悉的话,你会意识到上面的libevent的示例代码下并不是最优的。在Windows下更快的异步IO接口并不是 select() ,而是IOCP(IO Completion Ports)API。与其他的一些更快的网络API不一样的地方是,其他平台上的解决方案,比如 epoll() ,是在这个socket可以操作的时候告诉你,而IOCP是在这个操作结束之后告诉你,它已经把相应的数据处理完了,我们只需要直接使用,不用再次调用接收或是发送进行IO操作了。

    当然,libevent2里面的 bufferevents 已经解决了这个问题,是我们的代码写起来更简单,并且在Linux和Windows下都能提供更好的性能。下面就是我们通过 bufferevents 实现的代码

    /* For sockaddr_in */
    #include <netinet/in.h>
    /* For socket functions */
    #include <sys/socket.h>
    /* For fcntl */
    #include <fcntl.h>
    
    #include <event2/event.h>
    #include <event2/buffer.h>
    #include <event2/bufferevent.h>
    
    #include <assert.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <errno.h>
    
    #define MAX_LINE 16384
    
    void do_read(evutil_socket_t fd, short events, void *arg);
    void do_write(evutil_socket_t fd, short events, void *arg);
    
    char rot13_char(char c)
    {
        /* We don't want to use isalpha here; setting the locale would change
         * which characters are considered alphabetical. */
        if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
            return c + 13;
        else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
            return c - 13;
        else
            return c;
    }
    
    void readcb(struct bufferevent *bev, void *ctx)
    {
        struct evbuffer *input, *output;
        char *line;
        size_t n;
        int i;
        input = bufferevent_get_input(bev);
        output = bufferevent_get_output(bev);
    
        while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
            for (i = 0; i < n; ++i)
                line[i] = rot13_char(line[i]);
            evbuffer_add(output, line, n);
            evbuffer_add(output, "
    ", 1);
            free(line);
        }
    
        if (evbuffer_get_length(input) >= MAX_LINE) {
            /* Too long; just process what there is and go on so that the buffer
             * doesn't grow infinitely long. */
            char buf[1024];
            while (evbuffer_get_length(input)) {
                int n = evbuffer_remove(input, buf, sizeof(buf));
                for (i = 0; i < n; ++i)
                    buf[i] = rot13_char(buf[i]);
                evbuffer_add(output, buf, n);
            }
            evbuffer_add(output, "
    ", 1);
        }
    }
    
    void errorcb(struct bufferevent *bev, short error, void *ctx)
    {
        if (error & BEV_EVENT_EOF) {
            /* connection has been closed, do any clean up here */
            /* ... */
        } else if (error & BEV_EVENT_ERROR) {
            /* check errno to see what error occurred */
            /* ... */
        } else if (error & BEV_EVENT_TIMEOUT) {
            /* must be a timeout event handle, handle it */
            /* ... */
        }
        bufferevent_free(bev);
    }
    
    void do_accept(evutil_socket_t listener, short event, void *arg)
    {
        struct event_base *base = (struct event_base *)arg;
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener, (struct sockaddr*)&ss, &slen);
        if (fd < 0) {
            perror("accept");
        } else if (fd > FD_SETSIZE) {
            close(fd);
        } else {
            struct bufferevent *bev;
            evutil_make_socket_nonblocking(fd);
            bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
            bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
            bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
            bufferevent_enable(bev, EV_READ|EV_WRITE);
        }
    }
    
    void
    run(void)
    {
        evutil_socket_t listener;
        struct sockaddr_in sin;
        struct event_base *base;
        struct event *listener_event;
    
        base = event_base_new();
        if (!base)
            return; /*XXXerr*/
    
        sin.sin_family = AF_INET;
        sin.sin_addr.s_addr = 0;
        sin.sin_port = htons(40713);
    
        listener = socket(AF_INET, SOCK_STREAM, 0);
        evutil_make_socket_nonblocking(listener);
    
    #ifndef WIN32
        {
            int one = 1;
            setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
        }
    #endif
    
        if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
            perror("bind");
            return;
        }
    
        if (listen(listener, 16)<0) {
            perror("listen");
            return;
        }
    
        listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
        /*XXX check it */
        event_add(listener_event, NULL);
    
        event_base_dispatch(base);
    }
    
    int
    main(int c, char **v)
    {
        setvbuf(stdout, NULL, _IONBF, 0);
    
        run();
        return 0;
    }
  • 相关阅读:
    asp.net 正则表达式
    字符串分隔
    用定时器实现逐渐放大层的功能
    js获取剪贴板内容
    使用无线网卡共享上网
    使用事件探查器跟踪sqlserver进程
    document.all.WebBrowser.ExecWB
    (转)JAVA与.NET DES加密解密
    web打印的实现
    关于div的定位
  • 原文地址:https://www.cnblogs.com/studywithallofyou/p/13047927.html
Copyright © 2011-2022 走看看