Python socket.settimeout方法代码示例

本文整理汇总了Python中socket.settimeout方法的典型用法代码示例。如果您正苦于以下问题:Python socket.settimeout方法的具体用法?Python socket.settimeout怎么用?Python socket.settimeout使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块socket的用法示例。

在下文中一共展示了socket.settimeout方法的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

# 需要导入模块: import socket [as 别名]
# 或者: from socket import settimeout [as 别名]
def __init__(self, asyncSocketsPool, socket, recvBufSlot=None, sendBufSlot=None) :
        if type(self) is XAsyncSocket :
            raise XAsyncSocketException('XAsyncSocket is an abstract class and must be implemented.')
        self._asyncSocketsPool = asyncSocketsPool
        self._socket           = socket
        self._recvBufSlot      = recvBufSlot
        self._sendBufSlot      = sendBufSlot
        self._expireTimeSec    = None
        self._state            = None
        self._onClosed         = None
        try :
            socket.settimeout(0)
            socket.setblocking(0)
            if (recvBufSlot is not None and type(recvBufSlot) is not XBufferSlot) or \
               (sendBufSlot is not None and type(sendBufSlot) is not XBufferSlot) :
                raise Exception()
            asyncSocketsPool.AddAsyncSocket(self)
        except :
            raise XAsyncSocketException('XAsyncSocket : Arguments are incorrects.')
    # ------------------------------------------------------------------------ 
开发者ID:jczic,项目名称:MicroWebSrv2,代码行数:23,代码来源:XAsyncSockets.py


示例2: handle_request

# 需要导入模块: import socket [as 别名]
# 或者: from socket import settimeout [as 别名]
def handle_request(self):
        """Handle one request, possibly blocking.
        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock() 
开发者ID:Soft8Soft,项目名称:verge3d-blender-addon,代码行数:19,代码来源:socketserver.py


示例3: __init__

# 需要导入模块: import socket [as 别名]
# 或者: from socket import settimeout [as 别名]
def __init__(self, microWebSrv, socket, addr) :
            socket.settimeout(2)
            self._microWebSrv   = microWebSrv
            self._socket        = socket
            self._addr          = addr
            self._method        = None
            self._path          = None
            self._httpVer       = None
            self._resPath       = "/"
            self._queryString   = ""
            self._queryParams   = { }
            self._headers       = { }
            self._contentType   = None
            self._contentLength = 0
            if hasattr(socket, 'readline'):   # MicroPython
                self._socketfile = self._socket
            else:   # CPython
                self._socketfile = self._socket.makefile('rwb')
            self._processRequest()
        # ------------------------------------------------------------------------ 
开发者ID:jczic,项目名称:MicroWebSrv,代码行数:25,代码来源:microWebSrv.py


示例4: handle_request

# 需要导入模块: import socket [as 别名]
# 或者: from socket import settimeout [as 别名]
def handle_request(self):
        """Handle one request, possibly blocking.
        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = select.select([self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock() 
开发者ID:glmcdona,项目名称:meddle,代码行数:19,代码来源:SocketServer.py


示例5: testConnectTimeout

# 需要导入模块: import socket [as 别名]
# 或者: from socket import settimeout [as 别名]
def testConnectTimeout(self):
        # Choose a private address that is unlikely to exist to prevent
        # failures due to the connect succeeding before the timeout.
        # Use a dotted IP address to avoid including the DNS lookup time
        # with the connect time.  This avoids failing the assertion that
        # the timeout occurred fast enough.
        addr = ('10.0.0.0', 12345)
        # Test connect() timeout
        _timeout = 0.001
        self.sock.settimeout(_timeout)
        _t1 = time.time()
        self.assertRaises(socket.error, self.sock.connect, addr)
        _t2 = time.time()
        _delta = abs(_t1 - _t2)
        self.assertTrue(_delta < _timeout + self.fuzz,
                     "timeout (%g) is more than %g seconds more than expected (%g)"
                     %(_delta, self.fuzz, _timeout)) 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:22,代码来源:test_timeout.py


示例6: testRecvTimeout

# 需要导入模块: import socket [as 别名]
# 或者: from socket import settimeout [as 别名]
def testRecvTimeout(self):
        # Test recv() timeout
        _timeout = 0.02
        with test_support.transient_internet(self.addr_remote[0]):
            self.sock.connect(self.addr_remote)
            self.sock.settimeout(_timeout)
            _t1 = time.time()
            self.assertRaises(socket.timeout, self.sock.recv, 1024)
            _t2 = time.time()
            _delta = abs(_t1 - _t2)
            self.assertTrue(_delta < _timeout + self.fuzz,
                         "timeout (%g) is %g seconds more than expected (%g)"
                         %(_delta, self.fuzz, _timeout)) 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:18,代码来源:test_timeout.py


示例7: testAcceptTimeout

# 需要导入模块: import socket [as 别名]
# 或者: from socket import settimeout [as 别名]
def testAcceptTimeout(self):
        # Test accept() timeout
        _timeout = 2
        self.sock.settimeout(_timeout)
        # Prevent "Address already in use" socket exceptions
        test_support.bind_port(self.sock, self.localhost)
        self.sock.listen(5)
        _t1 = time.time()
        self.assertRaises(socket.error, self.sock.accept)
        _t2 = time.time()
        _delta = abs(_t1 - _t2)
        self.assertTrue(_delta < _timeout + self.fuzz,
                     "timeout (%g) is %g seconds more than expected (%g)"
                     %(_delta, self.fuzz, _timeout)) 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:18,代码来源:test_timeout.py


示例8: _sock_operation

# 需要导入模块: import socket [as 别名]
# 或者: from socket import settimeout [as 别名]
def _sock_operation(self, count, timeout, method, *args):
        """
        Test the specified socket method.
        The method is run at most `count` times and must raise a socket.timeout
        within `timeout` + self.fuzz seconds.
        """
        self.sock.settimeout(timeout)
        method = getattr(self.sock, method)
        for i in range(count):
            t1 = time.time()
            try:
                method(*args)
            except socket.timeout as e:
                delta = time.time() - t1
                break
        else:
            self.fail('socket.timeout was not raised')
        # These checks should account for timing unprecision
        self.assertLess(delta, timeout + self.fuzz)
        self.assertGreater(delta, timeout - 1.0) 
开发者ID:Microvellum,项目名称:Fluid-Designer,代码行数:23,代码来源:test_timeout.py


示例9: _send_socket

# 需要导入模块: import socket [as 别名]
# 或者: from socket import settimeout [as 别名]
def _send_socket(self, cmd, rtnCmd, ip, port):
        socket = self._socket
        try:
            _LOGGER.debug('Sending to GW {0}'.format(cmd))
            self._read_unwanted_data()
            socket.settimeout(30.0)
            socket.sendto(cmd.encode(), (ip, port))
            socket.settimeout(30.0)
            data, addr = socket.recvfrom(1024)
            if len(data) is not None:
                resp = json.loads(data.decode())
                _LOGGER.debug('Recieved from GW {0}'.format(resp))
                if resp["cmd"] == rtnCmd:
                    return resp
                else:
                    _LOGGER.error("Response from {0} does not match return cmd".format(ip))
                    _LOGGER.error(data)
            else:
                _LOGGER.error("No response from Gateway")
        except socket.timeout:
            _LOGGER.error("Cannot connect to Gateway")
            socket.close() 
开发者ID:monster1025,项目名称:aqara-mqtt,代码行数:25,代码来源:xiaomihub.py



注:本文中的socket.settimeout方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。