zoukankan      html  css  js  c++  java
  • socket 发送类

    """用于与 PLC Socket 通讯"""
    import socket
    import logging
    class SocketConnectClass:
        """Socket 连接通讯类"""
        def __init__(self, server_ip, server_port, try_connect_num, client_ip, client_port, recv_size=1024, agreement='TCP'):
            self._socket = None
            self.socket_ip = server_ip
            self.socket_port = server_port
            self.client_ip = client_ip
            self.client_port = client_port
            self.try_connect_num = try_connect_num
            self.recv_size = recv_size
            self.agreement = agreement.upper()
            self.agreement_connect_dict = {'TCP':self.__connect_tcp, 'UDP':self.__connect_udp}
            self.agreement_send_command_dict = {'TCP':self.__send_tcp, 'UDP':self.__send_udp}
    
        def connect(self):
            """连接 Socket"""
            logging.info('Socket 通讯协议:%s', self.agreement)
            connect_func = self.agreement_connect_dict.get(self.agreement)
            if connect_func:
                return connect_func()
            logging.info('不支持该传输协议连接:%s',self.agreement)
    
        def send_command(self, command):
            """发送 Socket 命令"""
            send_command_func = self.agreement_send_command_dict.get(self.agreement)
            if send_command_func:
                return send_command_func(command)
            logging.info('不支持该传输协议发送命令:%s',self.agreement)
    
        def __connect_tcp(self):
            """TCP 协议连接"""
            try_connect_init_num = 0
            while True:
                if try_connect_init_num > self.try_connect_num:
                    logging.info('Socket 连接失败')
                    break
                if try_connect_init_num > 0:
                    logging.info('Socket 连接重试次数:%s',try_connect_init_num)
                    self.client_port += 50
                try:
                    # 1 创建套接字
                    self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    self._socket.bind((self.client_ip,self.client_port))
                    # 2 创建连接
                    self._socket.connect((self.socket_ip, self.socket_port))
                    return True
                except TimeoutError:
                    if try_connect_init_num < self.try_connect_num:
                        logging.info('Socket 连接超时')
                    try_connect_init_num += 1
                except Exception as e:
                    if try_connect_init_num < self.try_connect_num:
                        logging.exception('Socket 连接报错')
                    try_connect_init_num += 1
            self._socket = None
    
        def __connect_udp(self):
            """UDP 协议连接"""
            self._socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
            return True
    
        def __send_tcp(self, command):
            """TCP 发送命令"""
            try:
                self._socket.send(command)
                logging.info("Socket 发送命令为: %s",command)
                recv_data = self._socket.recv(self.recv_size)
                logging.info('Socket 接收数据为: %s', recv_data)
                return recv_data
            except Exception:
                logging.exception('Socket 发送命令出错')
        
        def __send_udp(self, command):
            """UDP 发送命令"""
            # 发送数据 字节
            try:
                self._socket.sendto(command,(self.socket_ip,self.socket_port))
                logging.info("Socket 发送命令为: %s",command)
                return True
            except Exception as e:
                logging.exception('Socket 发送命令出错')
    
        # def close(self):
        #     """不需要关闭 Socket 连接"""
        #     if self._socket:
        #         self._socket.close()
        #     self._socket = None
        #     logging.info('Socket 连接已关闭')
    
  • 相关阅读:
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    基于分布式锁解决定时任务重复问题
    基于Redis的Setnx实现分布式锁
    基于数据库悲观锁的分布式锁
    使用锁解决电商中的超卖
  • 原文地址:https://www.cnblogs.com/pythonwl/p/15189929.html
Copyright © 2011-2022 走看看