zoukankan      html  css  js  c++  java
  • 一个简陋的高并发请求脚本的演进过程

    因为一个朋友最近想搞接口压力测试,推荐了jmeter,因为jmeter开源,且有命令行启动模式,方便封装。兴起时,自己也简单实现了一下高并发的脚本。

    一开始想到的采用的是多进程+多线程+协程。想法是这样的,多进程是为了有效利用多核,理论上最好一个核对应一个进程比较好;那我为什么还要用多线程呢?不怕GIL全局锁吗?当时我是这么想的,因为我用了gevent处理,请求采用requests,但requests是阻塞的方法,所以我把requests操作丢到协程做,就没啥问题了。接下来看看脚本,实现了一个2000并发量的脚本(写的比较烂,不要在意这些细节)

    # coding:utf-8
    import multiprocessing
    import requests
    import threading
    
    import gevent
    
    
    process_num = 10  # 进程数
    gevent_num = 10  # 协程数
    threading_num = 20
    
    
    def asynchronous(url):
        threads = []
        for i in range(gevent_num):
            threads.append(gevent.spawn(request_url, url))
        gevent.joinall(threads)
    
    
    def request_url(url):
        code = requests.get(url).status_code
        if code != 200:
            print "the time request failed: " + str(code)
        else:
            print "the time request ok"
    
    
    def run_in_thread(url):
        threadings = []
    
        for i in xrange(threading_num):
            t = threading.Thread(target=asynchronous, args=(url,))
            t.daemon = True
            t.start()
            threadings.append(t)
        for t in threadings:
            t.join()
    
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=process_num)
        for i in range(process_num):
            pool.apply_async(run_in_thread, ("https://www.jd.com",))
        pool.close()
        pool.join()

     但是这个脚本是有问题的,因为requests是阻塞方法,因此导致协程其实是无效的,因为它会阻塞直到前一个协程任务结束,所以需要把requests替换成异步方法,下面看看替换后的方法。

    # coding:utf-8
    import multiprocessing
    import threading
    
    import gevent
    import time
    
    import tornado
    from tornado.httpclient import AsyncHTTPClient
    
    process_num = 10  # 进程数
    gevent_num = 10  # 协程数
    threading_num = 2   # 线程数
    
    
    def asynchronous(url):
        threads = []
        for i in range(gevent_num):
            threads.append(gevent.spawn(request_url, url))
        gevent.joinall(threads)
    
    
    def request_url(url):
        http_client = AsyncHTTPClient()
        http_client.fetch(url, callback=handle_request)
        loop = tornado.ioloop.IOLoop.instance()
        if loop._running is False:
            loop.start()
    
    
    def run_in_thread(url):
        threadings = []
    
        for i in xrange(threading_num):
            t = threading.Thread(target=asynchronous, args=(url,))
            t.daemon = True
            t.start()
            threadings.append(t)
        for t in threadings:
            t.join()
    
    
    def handle_request(response):
        print response.code
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=process_num)
        for i in range(process_num):
            pool.apply_async(run_in_thread, ("https://www.jd.com/",))
        pool.close()
        pool.join()

     不过依然还有问题,但是屏幕输出的信息滞后,如果在代码里print,会导致异步代码执行效率降低,为了统计数据,使用了process的Manager来在进程中做并发数累积,此时发现随着线程数的增加,并发能力反而降低了,这就是GIL锁的限制了,因此在python2中,使用了协程的话,就不要使用多线程了,接下来看看将线程改为1后,且加了计数器后的代码

    # coding:utf-8
    import multiprocessing
    
    import gevent
    
    import time
    import tornado
    from threadpool import ThreadPool, makeRequests
    from tornado.httpclient import AsyncHTTPClient
    from multiprocessing import Process,Manager
    
    process_num = 20    # 进程数
    gevent_num = 200     # 协程数
    threading_num = 1  # 线程数
    
    url = "http://www.baidu.com"
    
    sum = Manager().Value('count', 0)
    
    
    def asynchronous(url):
        try:
            threads = []
            for i in range(gevent_num):
                threads.append(gevent.spawn(request_url, url))
            gevent.joinall(threads)
        except Exception as e:
            pass
    
    
    def request_url(url):
        http_client = AsyncHTTPClient()
        sum.value += 1
        http_client.fetch(url, callback=handle_request)
        # time.sleep(1)
        # print " count: " + str(sum.value) + " cur process: " + str(os.getpid()) + " cur thread: " + str(threading.current_thread)
        global loop
        loop = tornado.ioloop.IOLoop.instance()
        if loop._running is False:
            loop.start()
    
    
    def run_in_thread(url):
        pool = ThreadPool(threading_num)
        requests = makeRequests(asynchronous, [url])
        [pool.putRequest(req) for req in requests]
        pool.wait()
    
    
    def handle_request(response):
        # print "current site: " + str(response.effective_url) + " , request  time: " + str(
        #     getattr(response, "request_time", "000"))
        loop.stop()
    
    
    def main():
        starttime = time.time()
        pool = multiprocessing.Pool(processes=process_num)
        for i in range(process_num):
            pool.apply_async(run_in_thread, (url,))
        pool.close()
        pool.join()
        print sum.value
        print "cost time: " + str(time.time() - starttime)
    
    
    if __name__ == '__main__':
        main()

    输出结果可以看看

    /usr/bin/python2.7 /home/shufeng/workspace/private_project/jobscrawler/center/sample.py
    3244
    cost time: 2.23202705383
    
    Process finished with exit code 0
  • 相关阅读:
    springboot运行在eclipse报异常的问题
    Python random模块
    MySQL大小写敏感
    正则表达式详解
    Linux grep命令详解
    Linux printf命令详解
    Linux awk命令详解
    MySQL表介绍
    Linux sed命令详解
    Linux grep命令详解
  • 原文地址:https://www.cnblogs.com/alexkn/p/7059832.html
Copyright © 2011-2022 走看看