zoukankan      html  css  js  c++  java
  • 一个简陋的高并发请求脚本的演进过程

    因为一个朋友最近想搞接口压力测试,推荐了jmeter,因为jmeter开源,且有命令行启动模式,方便封装。兴起时,自己也简单实现了一下高并发的脚本。

    一开始想到的采用的是多进程+多线程+协程。想法是这样的,多进程是为了有效利用多核,理论上最好一个核对应一个进程比较好;那我为什么还要用多线程呢?不怕GIL全局锁吗?当时我是这么想的,因为我用了gevent处理,请求采用requests,但requests是阻塞的方法,所以我把requests操作丢到协程做,就没啥问题了。接下来看看脚本,实现了一个2000并发量的脚本(写的比较烂,不要在意这些细节)

    # coding:utf-8
    import multiprocessing
    import requests
    import threading
    
    import gevent
    
    
    process_num = 10  # 进程数
    gevent_num = 10  # 协程数
    threading_num = 20
    
    
    def asynchronous(url):
        threads = []
        for i in range(gevent_num):
            threads.append(gevent.spawn(request_url, url))
        gevent.joinall(threads)
    
    
    def request_url(url):
        code = requests.get(url).status_code
        if code != 200:
            print "the time request failed: " + str(code)
        else:
            print "the time request ok"
    
    
    def run_in_thread(url):
        threadings = []
    
        for i in xrange(threading_num):
            t = threading.Thread(target=asynchronous, args=(url,))
            t.daemon = True
            t.start()
            threadings.append(t)
        for t in threadings:
            t.join()
    
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=process_num)
        for i in range(process_num):
            pool.apply_async(run_in_thread, ("https://www.jd.com",))
        pool.close()
        pool.join()

     但是这个脚本是有问题的,因为requests是阻塞方法,因此导致协程其实是无效的,因为它会阻塞直到前一个协程任务结束,所以需要把requests替换成异步方法,下面看看替换后的方法。

    # coding:utf-8
    import multiprocessing
    import threading
    
    import gevent
    import time
    
    import tornado
    from tornado.httpclient import AsyncHTTPClient
    
    process_num = 10  # 进程数
    gevent_num = 10  # 协程数
    threading_num = 2   # 线程数
    
    
    def asynchronous(url):
        threads = []
        for i in range(gevent_num):
            threads.append(gevent.spawn(request_url, url))
        gevent.joinall(threads)
    
    
    def request_url(url):
        http_client = AsyncHTTPClient()
        http_client.fetch(url, callback=handle_request)
        loop = tornado.ioloop.IOLoop.instance()
        if loop._running is False:
            loop.start()
    
    
    def run_in_thread(url):
        threadings = []
    
        for i in xrange(threading_num):
            t = threading.Thread(target=asynchronous, args=(url,))
            t.daemon = True
            t.start()
            threadings.append(t)
        for t in threadings:
            t.join()
    
    
    def handle_request(response):
        print response.code
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=process_num)
        for i in range(process_num):
            pool.apply_async(run_in_thread, ("https://www.jd.com/",))
        pool.close()
        pool.join()

     不过依然还有问题,但是屏幕输出的信息滞后,如果在代码里print,会导致异步代码执行效率降低,为了统计数据,使用了process的Manager来在进程中做并发数累积,此时发现随着线程数的增加,并发能力反而降低了,这就是GIL锁的限制了,因此在python2中,使用了协程的话,就不要使用多线程了,接下来看看将线程改为1后,且加了计数器后的代码

    # coding:utf-8
    import multiprocessing
    
    import gevent
    
    import time
    import tornado
    from threadpool import ThreadPool, makeRequests
    from tornado.httpclient import AsyncHTTPClient
    from multiprocessing import Process,Manager
    
    process_num = 20    # 进程数
    gevent_num = 200     # 协程数
    threading_num = 1  # 线程数
    
    url = "http://www.baidu.com"
    
    sum = Manager().Value('count', 0)
    
    
    def asynchronous(url):
        try:
            threads = []
            for i in range(gevent_num):
                threads.append(gevent.spawn(request_url, url))
            gevent.joinall(threads)
        except Exception as e:
            pass
    
    
    def request_url(url):
        http_client = AsyncHTTPClient()
        sum.value += 1
        http_client.fetch(url, callback=handle_request)
        # time.sleep(1)
        # print " count: " + str(sum.value) + " cur process: " + str(os.getpid()) + " cur thread: " + str(threading.current_thread)
        global loop
        loop = tornado.ioloop.IOLoop.instance()
        if loop._running is False:
            loop.start()
    
    
    def run_in_thread(url):
        pool = ThreadPool(threading_num)
        requests = makeRequests(asynchronous, [url])
        [pool.putRequest(req) for req in requests]
        pool.wait()
    
    
    def handle_request(response):
        # print "current site: " + str(response.effective_url) + " , request  time: " + str(
        #     getattr(response, "request_time", "000"))
        loop.stop()
    
    
    def main():
        starttime = time.time()
        pool = multiprocessing.Pool(processes=process_num)
        for i in range(process_num):
            pool.apply_async(run_in_thread, (url,))
        pool.close()
        pool.join()
        print sum.value
        print "cost time: " + str(time.time() - starttime)
    
    
    if __name__ == '__main__':
        main()

    输出结果可以看看

    /usr/bin/python2.7 /home/shufeng/workspace/private_project/jobscrawler/center/sample.py
    3244
    cost time: 2.23202705383
    
    Process finished with exit code 0
  • 相关阅读:
    hadoop再次集群搭建(3)-如何选择相应的hadoop版本
    48. Rotate Image
    352. Data Stream as Disjoint Interval
    163. Missing Ranges
    228. Summary Ranges
    147. Insertion Sort List
    324. Wiggle Sort II
    215. Kth Largest Element in an Array
    快速排序
    280. Wiggle Sort
  • 原文地址:https://www.cnblogs.com/alexkn/p/7059832.html
Copyright © 2011-2022 走看看