美文网首页测试开发
Python_服务端性能高并发测试

Python_服务端性能高并发测试

作者: 古佛青灯度流年 | 来源:发表于2018-01-14 10:17 被阅读66次

背景

公司业务处于上升期,但是服务端却low的像个demo,于是乎重构服务端开始了;
关于测试,测试这个行业在大多数互联网公司比较失败,做测试的应该都有这种感觉。但是我感觉我很幸运,因为CTO很重视测试,他的口头禅就是不可测试的程序都是不可靠的,所以我们公司的所有程序都会有配套的测试工具。这篇文章中的工具就是专门测试服务端而模拟的客户端TSP。

业务需求

重构服务端,达到接入百万客户端级别

实现原理

  1. 服务端监听3个端口,这三个端口全部是设备发起
  2. tsp模拟客户端连接服务端逻辑,从而达到设备、用户上线
  3. 采用进程池嵌套线程池方式,达到高并发的目的
  4. 代码中统计发包时间平均值、收包时间平均值、收包字节大小平均值、收包错误率;
  5. 服务端采集硬件配置、cpu负载、内存负载、磁盘io等等,想知道更多参数

具体实现

1.构建主流程,完成基于socket请求的收发包

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
from socket import socket,AF_INET,SOCK_STREAM

import struct

from log import log
log=log()
class socket_main:

    def __init__(self,who,conn_data):
        self.conn_data = conn_data
        self.who = who
        self.__conn(conn_data)

    def __conn(self,addr):

        try:
            log.debug('{} _conn data:{}'.format(self.who,addr))
            self.tcpCliSock = socket(AF_INET, SOCK_STREAM)
            self.tcpCliSock.connect(addr)
            self.tcpCliSock.setblocking(1)
            self.tcpCliSock.settimeout(1)
            # self.tcpCliSock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,8192)
            log.debug('{} socket data:{}'.format(self.who,'success'))
        except BaseException as msg:
            log.error('{} socket conn_result:{}'.format(self.who,msg))

    def send(self,data):
        try:
            try:
                result = self.tcpCliSock.send(data)
            except BaseException as msg:
                # self.__conn(self.conn_data)
                # result = self.tcpCliSock.send(data)
                log.error(msg)
                raise msg
            log.debug('{} socket send_len:{}'.format(self.who, result))
            return result
        except BaseException as msg:
            raise log.error('{} socket send_error:{}'.format(self.who,msg))

    def recv(self,size,code):
        result =''
        try:
            try:
                result = self.tcpCliSock.recv(int(size))

                while len(result)<int(size):
                    result +=self.tcpCliSock.recv(int(size)-len(result))
            except:
                pass
            if not result:
               log.error('{} socket recv_result ERROR:result is null'.format(self.who))
            log.debug('{} socket recv_result:{}'.format(self.who, len(result)))
            data_struct = struct.unpack(code, result)
            log.debug('{} socket recv_result_unpack:{}'.format(self.who, data_struct))
            return data_struct
        except BaseException as msg:
            log.error('{} socket recv_error:{}'.format(self.who,msg))
            raise msg

    def only_recv(self,size):
        try:
            result = self.tcpCliSock.recv(int(size))
            while len(result) < int(size):
                result += self.tcpCliSock.recv(int(size) - len(result))
            log.debug('{} socket only_recv_result_len:{}'.format(self.who, len(result)))
            log.debug('only_recv:{}'.format(result))
            return result
        except BaseException as msg:
            log.error('{} socket recv_error:{}'.format(self.who,msg))
            raise msg

    def close(self):
        try:
            log.debug('{} socket has been closed'.format(self.who))
            self.tcpCliSock.close()
        except BaseException as msg:
            log.debug('{} socket close_error:{}'.format(self.who,msg))

  1. 建立进程池、线程池套餐
def main():
  print '业务代码'

class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        apply(self.func, self.args)

def main_thread():
    global sn
    threads = []
    nloops = xrange(thread_num)# thread_num并发线程数
    for i in nloops:
        mac, mac_real, sn = getMacSn()
        t = MyThread(main, (mac,mac_real,sn))
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()


if __name__=='__main__':
    result = ''
    pool = multiprocessing.Pool(processes=proc)# processes进程池数量
    log.info("main process(%d) running..." % os.getpid())
    for i in xrange(proc_num):# proc_num 并发进程数量
        result = pool.apply_async(main_thread)
    pool.close()
    pool.join()

    if not result.successful():
        log.error('主进程异常:{}'.format(result.successful()))
    else:
        log.info('goodbye:主进程({})执行完毕'.format(os.getpid()))

  1. 通过在业务代码中计算统计参数,然后放在内存中累计计算,输出到指定级别日志中,用到的主要方法是通过设置全局变量进行统一计算:
class globalMap:
    # 拼装成字典构造全局变量  借鉴map  包含变量的增删改查
    map = {}

    def set_map(self, key, value):
        if(isinstance(value,dict)):
            value = json.dumps(value)
        self.map[key] = value
        log.debug(key + ":" + str(value))


    def set(self, **keys):
        try:
            for key_, value_ in keys.items():
                self.map[key_] = str(value_)
                log.debug(key_+":"+str(value_))
        except BaseException as msg:
            log.error(msg)
            raise msg

    def del_map(self, key):
        try:
            del self.map[key]
            return self.map
        except KeyError:
            log.error("key:'" + str(key) + "'  不存在")


    def get(self,*args):
        try:
            dic = {}
            for key in args:
                if len(args)==1:
                    dic = self.map[key]
                    log.debug(key+":"+str(dic))
                elif len(args)==1 and args[0]=='all':
                    dic = self.map
                else:
                    dic[key]=self.map[key]
            return dic
        except KeyError:
            log.warning("key:'" + str(key) + "'  不存在")
            return 'Null_'

总结

多进程的方式和多线程进行对比,虽然Python全局锁的限制导致线程有点瑕疵,但是跟进程比起并发,还是能甩进程几条街的。通过这种方式进行高密度业务逻辑操作,可以很轻松找到服务端瓶颈。

@雾霾-一个早晨,被窝中-2018-01-14 10:17:13

相关文章

  • Python_服务端性能高并发测试

    背景 公司业务处于上升期,但是服务端却low的像个demo,于是乎重构服务端开始了;关于测试,测试这个行业在大多数...

  • PTS + ARMS打造性能和应用诊断利器

    摘要:服务端的性能测试,尤其是业务性能测试,是用来评估性能容量、诊断性能瓶颈和应用错误,或是验证高可用的能力,以此...

  • 经典组合 | PTS + ARMS打造性能和应用诊断利器

    服务端的性能测试,尤其是业务性能测试,是用来评估性能容量、诊断性能瓶颈和应用错误,或是验证高可用的能力,以此达到降...

  • Nginx Tomcat集群配置

    并发访问 对于服务器来说,大量的并发访问容易造成服务器宕机 并发访问性能测试 可以通过压力测试来检查高并发访问的性...

  • aiohttp 并发测试web性能

    aiohttp 并发测试web性能

  • 开源性能压测工具 locust

    本文主要针对服务端,web系统的性能测试。 性能测试 根据不同的测试目的,性能测试具体细分为多种类型 基准测试 :...

  • 《大型网站技术架构》瞬时响应之网站的高性能架构(1)

    一、网站性能测试 (1)性能测试指标:①响应时间;②并发数;③吞吐量;④性能计数器; (2)性能测试方法:①性能测...

  • [erlang]高并发下的日志输出技巧

    前言 之前, 公司引进了压力测试的概念, 会使用 tsung 测试高并发下服务的性能. 这一测试, 原始的 lag...

  • 服务三高

    三高定义 高并发、高性能、高可用 1.1高并发与高性能的关系 高并发指的是机器的并发连接数。如果静态的看待,就是一...

  • 性能测试概览Ⅳ

    五类性能测试用例 ●预期指标的性能测试 ●并发用户的性能测试 ●疲劳强度和大数据量的性能测试 ●服务器性能测试 ●...

网友评论

  • 望月成三人:多进程+协程是否合适,之前简单研究过,不知道是否能做高并发测试

本文标题:Python_服务端性能高并发测试

本文链接:https://www.haomeiwen.com/subject/vpfkoxtx.html