zoukankan      html  css  js  c++  java
  • python 多进程并发接口测试实例

    #encoding=utf-8
    import requests
    import json
    import os
    import hashlib

    print "register------"
    data = json.dumps({'username': 'lildddy1', 'password': 'wcx123wacs', 'email': 'lsily@qq.com'}) #
    r = requests.post('http://39.106.41.11:8080/register/', data= data)
    print r.status_code
    print r.text
    print type(r.json())
    print str(r.json())

    c:Python27Scripts>python task_test.py

    register------

    200

    {"username": "lildddy1", "code": "01"}

    <type 'dict'>

    {u'username': u'lildddy1', u'code': u'01'}

    #encoding=utf-8

    import requests

    import json

    import string

    import random

    import multiprocessing

    from multiprocessing import Process, Pool, Value, Lock, Manager

    import time

    def test_register(lock,success_Count,failure_Count):

        print "************"

        data_dict={'username': 'xxdddy', 'password': 'wcx123wacs', 'email': 'lsily@qq.com'}

        for i in range(2):

            data_dict["username"] =  "".join(random.sample(string.lowercase,10))+str(i)

            data = json.dumps(data_dict) #

            r = requests.post('http://39.106.41.11:8080/register/', data= data)

            #print r.status_code

            #print r.text

            #print type(r.json())

            print str(r.json())  #用json的规则,把服务器的接口返回转换为了一个字典

            if r.json()["code"]=="00" and isinstance(r.json()["userid"],int):

                lock.acquire()

                success_Count.value+=1

                lock.release()

            else:

                lock.acquire()

                failure_Count.value+=1

                lock.release()

            #print "*"*50

    if __name__ == '__main__':

    lock=manager.Lock()#同学说用共享锁,改成lock=manager.Lock()

        manager = Manager()

        success_Count = manager.Value('i',0)

        failure_Count = manager.Value('i',0)

        proc_list = [Process(target=test_register, args=(lock,success_Count,failure_Count)) for i in range(2)]

        for p in proc_list: p.start()

        for p in proc_list: p.join()

        print 'Waiting for all subprocesses done...'

        print 'All subprocesses done.'

        print success_Count.value

        print failure_Count.value

    c:Python27Scripts>python task_test.py

    ************

    ************

    {u'code': u'00', u'userid': 2685}

    {u'code': u'00', u'userid': 2686}

    {u'code': u'00', u'userid': 2687}

    {u'code': u'00', u'userid': 2688}

    Waiting for all subprocesses done...

    All subprocesses done.

    4

    0

    单进程:

    #encoding=utf-8

    import requests

    import json

    import os

    import hashlib

    import string

    success_count =0

    faileure_count =0

    print "register------"

    data_dict={'username': 'xxdddy', 'password': 'wcx123wacs', 'email': 'lsily@qq.com'}

    for i in range(5):

        data_dict["username"] =  "xx2ddddy"+str(i)

        data = json.dumps(data_dict) #

        r = requests.post('http://39.106.41.11:8080/register/', data= data)

        print r.status_code

        print r.text

        print type(r.json())

        print str(r.json())  #用json的规则,把服务器的接口返回转换为了一个字典

        if r.json()["code"]=="00" and isinstance(r.json()["userid"],int):

            success_count+=1

        else:

            faileure_count+=1

        print "*"*50

    print "success count:",success_count

    print "faileure count:",faileure_count

    多进程的

    方法1:

    #encoding=utf-8

    import requests

    import json

    import string

    import random

    import multiprocessing

    from multiprocessing import Process, Pool, Value, Lock, Manager

    import time

    def test_register(lock,success_Count,failure_Count):

        print "************"

        data_dict={'username': 'xxdddy', 'password': 'wcx123wacs', 'email': 'lsily@qq.com'}

        for i in range(2):

            data_dict["username"] =  "".join(random.sample(string.lowercase,10))+str(i)

            data = json.dumps(data_dict) #

            r = requests.post('http://39.106.41.11:8080/register/', data= data)

            #print r.status_code

            #print r.text

            #print type(r.json())

            print str(r.json())  #用json的规则,把服务器的接口返回转换为了一个字典

            if r.json()["code"]=="00" and isinstance(r.json()["userid"],int):

                lock.acquire()

                success_Count.value+=1

                lock.release()

            else:

                lock.acquire()

                failure_Count.value+=1

                lock.release()

            #print "*"*50

    if __name__ == '__main__':

        lock = Lock()

        manager = Manager()

        success_Count = manager.Value('i',0)

        failure_Count = manager.Value('i',0)

        proc_list = [Process(target=test_register, args=(lock,success_Count,failure_Count)) for i in range(2)]

        for p in proc_list: p.start()

        for p in proc_list: p.join()

        print 'Waiting for all subprocesses done...'

        print 'All subprocesses done.'

        print success_Count.value

        print failure_Count.value

    方法2:

    #encoding=utf-8

    import requests

    import json

    import string

    import random

    import multiprocessing

    from multiprocessing import Process, Pool, Value, Lock, Manager

    import time

    def test_register(counter):

        print "************"

        data_dict={'username': 'xxdddy', 'password': 'wcx123wacs', 'email': 'lsily@qq.com'}

        for i in range(2):

            data_dict["username"] =  "".join(random.sample(string.lowercase,10))+str(i)

            data = json.dumps(data_dict) #

            r = requests.post('http://39.106.41.11:8080/register/', data= data)

            #print r.status_code

            #print r.text

            #print type(r.json())

            print str(r.json())  #用json的规则,把服务器的接口返回转换为了一个字典

            if r.json()["code"]=="00" and isinstance(r.json()["userid"],int):

                #lock.acquire()

                counter.increment_success_count()

                #lock.release()

            else:

                #lock.acquire()

                counter.increment_failure_count()

                #lock.release()

            #print "*"*50

    class Counter(object):

        def __init__(self, initval = 0):

            self.success_count = Value('i', initval)

            self.failure_count = Value('i', initval)

            self.lock = Lock()

        def increment_success_count(self):

            with self.lock:

                self.success_count.value += 1 # 共享变量自加1

        def increment_failure_count(self):

            with self.lock:

                self.failure_count.value += 1 # 共享变量自加1

        def get_success_count(self):

            with self.lock:

                return self.success_count.value

        def get_failure_count(self):

            with self.lock:

                return self.failure_count.value

    if __name__ == '__main__':

        #lock = Lock()

        manager = Manager()

        counter= Counter()

        proc_list = [Process(target=test_register, args=(counter,)) for i in range(2)]

        for p in proc_list: p.start()

        for p in proc_list: p.join()

        print 'Waiting for all subprocesses done...'

        print 'All subprocesses done.'

        print counter.get_success_count()

        print counter.get_failure_count()

    用进程池子Pool

    方法3:

    #encoding=utf-8

    import requests

    import json

    import string

    import random

    import multiprocessing

    from multiprocessing import Pool, Value,Manager

    import time

    def test_register(lock,success_Count,failure_Count):

        print "************"

        data_dict={'username': 'xxdddy', 'password': 'wcx123wacs', 'email': 'lsily@qq.com'}

        for i in range(2):

            data_dict["username"] =  "".join(random.sample(string.lowercase,10))+str(i)

            data = json.dumps(data_dict) #

            r = requests.post('http://39.106.41.11:8080/register/', data= data)

            #print r.status_code

            #print r.text

            #print type(r.json())

            print str(r.json())  #用json的规则,把服务器的接口返回转换为了一个字典

            if r.json()["code"]=="00" and isinstance(r.json()["userid"],int):

                lock.acquire()

                success_Count.value+=1

                lock.release()

            else:

                lock.acquire()

                failure_Count.value+=1

                lock.release()

            #print "*"*50

    if __name__ == '__main__' :   

        manager = Manager()

        lock =manager.Lock()

        success_Count = manager.Value('i',0)

        failure_Count = manager.Value('i',0)

        pool = Pool(3)  # start 4 worker processes

        pool.apply_async(test_register,args=(lock,success_Count,failure_Count)) 

        pool.apply_async(test_register,args=(lock,success_Count,failure_Count))

        pool.close()

        pool.join()

        print "Process end." 

        print "success count:",success_Count.value

        print "faileure count:",failure_Count.value

  • 相关阅读:
    CentOS_关机与重启命令详解
    去除ArrayList重复的对象
    单项设计模式
    死循环的应用
    java学习心得-面向对象与面向过程
    计算机使用个人经验及日常维护
    linux操作系统简介
    集合
    项目学习4
    周末总结
  • 原文地址:https://www.cnblogs.com/xiaxiaoxu/p/8944198.html
Copyright © 2011-2022 走看看