Python socket.getaddrinfo方法代码示例

本文整理汇总了Python中socket.getaddrinfo方法的典型用法代码示例。如果您正苦于以下问题:Python socket.getaddrinfo方法的具体用法?Python socket.getaddrinfo怎么用?Python socket.getaddrinfo使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块socket的用法示例。

在下文中一共展示了socket.getaddrinfo方法的26个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: host_is_local

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def host_is_local(hostname, port=22): # no port specified, just use the ssh port
    """
    Returns True if the hostname points to the localhost, otherwise False.
    """
    hostname = socket.getfqdn(hostname)
    if hostname in ("localhost", "0.0.0.0", "127.0.0.1", "1.0.0.127.in-addr.arpa",
                    "1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa"):
        return True
    localhost = socket.gethostname()
    if hostname == localhost:
        return True
    localaddrs = socket.getaddrinfo(localhost, port)
    targetaddrs = socket.getaddrinfo(hostname, port)
    for (ignored_family, ignored_socktype, ignored_proto, ignored_canonname,
         sockaddr) in localaddrs:
        for (ignored_rfamily, ignored_rsocktype, ignored_rproto,
             ignored_rcanonname, rsockaddr) in targetaddrs:
            if rsockaddr[0] == sockaddr[0]:
                return True
    return False 
开发者ID:CodeReclaimers,项目名称:neat-python,代码行数:22,代码来源:distributed.py


示例2: create_socket_ipv6_tx_ucast

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def create_socket_ipv6_tx_ucast(self, remote_address, port):
        try:
            sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        except IOError as err:
            self.warning("Could not create IPv6 UDP socket: %s", err)
            return None
        self.enable_addr_and_port_reuse(sock)
        try:
            sock_addr = socket.getaddrinfo(remote_address, port, socket.AF_INET6,
                                           socket.SOCK_DGRAM)[0][4]
            sock.connect(sock_addr)
        except IOError as err:
            self.warning("Could not connect UDP socket to address %s port %d: %s",
                         remote_address, port, err)
            return None
        return sock 
开发者ID:brunorijsman,项目名称:rift-python,代码行数:18,代码来源:interface.py


示例3: _do_dns_lookup

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def _do_dns_lookup(hostname: str, port: int) -> str:
    try:
        addr_infos = socket.getaddrinfo(hostname, port, socket.AF_UNSPEC, socket.IPPROTO_IP)
    except (socket.gaierror, IndexError, ConnectionError):
        raise ServerHostnameCouldNotBeResolved(f"Could not resolve {hostname}")
    family, socktype, proto, canonname, sockaddr = addr_infos[0]
    # By default use the first DNS entry, IPv4 or IPv6
    tentative_ip_addr = sockaddr[0]
    # But try to use IPv4 if we have both IPv4 and IPv6 addresses, to work around buggy networks
    for family, socktype, proto, canonname, sockaddr in addr_infos:
        if family == socket.AF_INET:
            tentative_ip_addr = sockaddr[0]
    return tentative_ip_addr 
开发者ID:nabla-c0d3,项目名称:sslyze,代码行数:19,代码来源:server_setting.py


示例4: is_valid_ip

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def is_valid_ip(ip):
    """Returns true if the given string is a well-formed IP address.
    Supports IPv4 and IPv6.
    """
    if not ip or '\x00' in ip:
        # getaddrinfo resolves empty strings to localhost, and truncates
        # on zero bytes.
        return False
    try:
        res = socket.getaddrinfo(ip, 0, socket.AF_UNSPEC,
                                 socket.SOCK_STREAM,
                                 0, socket.AI_NUMERICHOST)
        return bool(res)
    except socket.gaierror as e:
        if e.args[0] == socket.EAI_NONAME:
            return False
        raise
    return True 
开发者ID:tao12345666333,项目名称:tornado-zh,代码行数:21,代码来源:netutil.py


示例5: domain_ip_parser

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def domain_ip_parser(ip_or_domain_or_url, local_dns, ipv6):
    """parsing the arg to get the hostname to query."""
    #ip_or_domain_or_url = str(sys.argv[1])
    if ip_or_domain_or_url.startswith("https://") or ip_or_domain_or_url.startswith("http://"):
        ip_or_domain = ip_or_domain_or_url.split('/')[2]
    elif ip_or_domain_or_url == ".":
        ip_or_domain = ""
    else:
        ip_or_domain = ip_or_domain_or_url
    if local_dns:
        import socket
        ip_or_domain = socket.gethostbyname(ip_or_domain)
    elif ipv6:
        import socket
        ip_or_domain = socket.getaddrinfo(ip_or_domain, None, socket.AF_INET6)[0][-1][0]
    return ip_or_domain 
开发者ID:KiriKira,项目名称:scripts,代码行数:20,代码来源:ipip.py


示例6: select_address_family

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def select_address_family(host, port):
    """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
    the host and port."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if host.startswith("unix://"):
        return socket.AF_UNIX
    elif ":" in host and hasattr(socket, "AF_INET6"):
        return socket.AF_INET6
    return socket.AF_INET 
开发者ID:Frank-qlu,项目名称:recruit,代码行数:22,代码来源:serving.py


示例7: test_basic_behavior

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def test_basic_behavior(self):
    inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)
    inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)
    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')
    self.mox.StubOutWithMock(socket, 'getaddrinfo')
    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
                       socket.AI_PASSIVE).AndReturn(
                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),
                            (None, None, None, None, ('::1', 0, 'baz'))])
    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(
        inet4_server)
    inet4_server.start()
    inet4_server.port = 123
    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(
        inet6_server)
    inet6_server.start()
    self.mox.ReplayAll()
    self.server.start()
    self.mox.VerifyAll()
    self.assertItemsEqual([inet4_server, inet6_server],
                          self.server._servers) 
开发者ID:elsigh,项目名称:browserscope,代码行数:23,代码来源:wsgi_server_test.py


示例8: test_retry_limited

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def test_retry_limited(self):
    inet4_servers = [self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)
                     for _ in range(wsgi_server._PORT_0_RETRIES)]
    inet6_servers = [self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)
                     for _ in range(wsgi_server._PORT_0_RETRIES)]
    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')
    self.mox.StubOutWithMock(socket, 'getaddrinfo')
    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
                       socket.AI_PASSIVE).AndReturn(
                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),
                            (None, None, None, None, ('::1', 0, 'baz'))])
    for offset, (inet4_server, inet6_server) in enumerate(zip(
        inet4_servers, inet6_servers)):
      wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(
          inet4_server)
      inet4_server.start()
      inet4_server.port = offset + 1
      wsgi_server._SingleAddressWsgiServer(('::1', offset + 1), None).AndReturn(
          inet6_server)
      inet6_server.start().AndRaise(
          wsgi_server.BindError('message', (errno.EADDRINUSE, 'in use')))
      inet4_server.quit()
    self.mox.ReplayAll()
    self.assertRaises(wsgi_server.BindError, self.server.start)
    self.mox.VerifyAll() 
开发者ID:elsigh,项目名称:browserscope,代码行数:27,代码来源:wsgi_server_test.py


示例9: test_ignore_other_errors

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def test_ignore_other_errors(self):
    inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)
    inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)
    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')
    self.mox.StubOutWithMock(socket, 'getaddrinfo')
    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
                       socket.AI_PASSIVE).AndReturn(
                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),
                            (None, None, None, None, ('::1', 0, 'baz'))])
    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(
        inet4_server)
    inet4_server.start()
    inet4_server.port = 123
    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(
        inet6_server)
    inet6_server.start().AndRaise(
        wsgi_server.BindError('message', (errno.ENOPROTOOPT, 'no protocol')))
    self.mox.ReplayAll()
    self.server.start()
    self.mox.VerifyAll()
    self.assertItemsEqual([inet4_server],
                          self.server._servers) 
开发者ID:elsigh,项目名称:browserscope,代码行数:24,代码来源:wsgi_server_test.py


示例10: select_ip_version

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET 
开发者ID:jpush,项目名称:jbox,代码行数:19,代码来源:serving.py


示例11: _start

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def _start(self):
        def try_address(address):
            try:
                (ip_address, port) = address
                index = 0
                for info in socket.getaddrinfo(ip_address, port, socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP):
                    try:
                        with self._lock:
                            self._found.append((info[4][0], info[4][1]))
                    except Exception, e:
                        pass
                    # snooze for some time, so each dns_seed has a chance
                    # to add nodes, and get addresses from those nodes
                    #snooze = -1 + 1.3 ** index
                    #if snooze > 600: snooze = 600 + random.randint(0, 120)
                    #index += 1
                    #time.sleep(snooze)
            except Exception, e:
                pass 
开发者ID:ricmoo,项目名称:pycoind,代码行数:24,代码来源:bootstrap.py


示例12: _xmit_packet

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def _xmit_packet(self, retry=True, delay_xmit=None):
        if self.sequencenumber:
            self.sequencenumber += 1
        if delay_xmit is not None:
            # skip transmit, let retry timer do it's thing
            self.waiting_sessions[self] = {}
            self.waiting_sessions[self]['ipmisession'] = self
            self.waiting_sessions[self]['timeout'] = delay_xmit +  _monotonic_time()
            return
        if self.sockaddr:
            self.send_data(self.netpacket, self.sockaddr)
        else:
            self.allsockaddrs = []
            try:
                for res in socket.getaddrinfo(self.bmc, self.port, 0, socket.SOCK_DGRAM):
                    sockaddr = res[4]
                    if res[0] == socket.AF_INET:
                        # convert the sockaddr to AF_INET6
                        newhost = '::ffff:' + sockaddr[0]
                        sockaddr = (newhost, sockaddr[1], 0, 0)
                    self.allsockaddrs.append(sockaddr)
                    self.bmc_handlers[sockaddr] = self
                    self.send_data(self.netpacket, sockaddr)
            except socket.gaierror:
                raise exc.IpmiException("Unable to transmit to specified address")
        if retry:
            self.waiting_sessions[self] = {}
            self.waiting_sessions[self]['ipmisession'] = self
            self.waiting_sessions[self]['timeout'] = self.timeout + _monotonic_time() 
开发者ID:rhtyd,项目名称:ipmisim,代码行数:31,代码来源:fakesession.py


示例13: getdestbyaddr

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def getdestbyaddr(self, address, af):
        #print('getaddrinfo', address[0], address[1], af, self.sock_type)
        if af == AF_INET6 and address[0].startswith('['):
            return address
        ainfo = getaddrinfo(address[0], address[1], af, self.sock_type)
        amatch = [x[4] for x in ainfo if x[0] == af]
        if len(amatch) == 0:
            raise Exception('no match for the %s in AF %s' % (address, af))
        amatch = amatch[0]
        if af == AF_INET6:
            return (('[%s]' % amatch[0], amatch[1]))
        return ((amatch[0], amatch[1])) 
开发者ID:sippy,项目名称:rtp_cluster,代码行数:14,代码来源:Rtp_proxy_client_net.py


示例14: run

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def run(self):
        while True:
            self.userv.wi_available.acquire()
            while len(self.userv.wi) == 0:
                self.userv.wi_available.wait()
            wi = self.userv.wi.pop(0)
            if wi == None:
                # Shutdown request, relay it further
                self.userv.wi.append(None)
                self.userv.wi_available.notify()
            self.userv.wi_available.release()
            if wi == None:
                break
            data, address = wi
            try:
                ai = socket.getaddrinfo(address[0], None, self.userv.uopts.family)
            except:
                continue
            if self.userv.uopts.family == socket.AF_INET:
                address = (ai[0][4][0], address[1])
            else:
                address = (ai[0][4][0], address[1], ai[0][4][2], ai[0][4][3])
            for i in range(0, 20):
                try:
                    if self.userv.skt.sendto(data, address) == len(data):
                        break
                except socket.error as why:
                    if isinstance(why, BrokenPipeError):
                        self.userv = None
                        return
                    if why.errno not in (EWOULDBLOCK, ENOBUFS, EAGAIN):
                        break
                sleep(0.01)
        self.userv = None 
开发者ID:sippy,项目名称:rtp_cluster,代码行数:36,代码来源:Udp_server.py


示例15: __init__

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def __init__(self, global_config, uopts):
        self.uopts = uopts.getCopy()
        self.skt = socket.socket(self.uopts.family, socket.SOCK_DGRAM)
        if self.uopts.laddress != None:
            ai = socket.getaddrinfo(self.uopts.laddress[0], None, self.uopts.family)
            if self.uopts.family == socket.AF_INET:
                address = (ai[0][4][0], self.uopts.laddress[1])
            else:
                address = (ai[0][4][0], self.uopts.laddress[1], ai[0][4][2], ai[0][4][3])
            if (self.uopts.flags & socket.SO_REUSEADDR) != 0:
                self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            if hasattr(socket, 'SO_REUSEPORT') and \
              (self.uopts.flags & socket.SO_REUSEPORT) != 0:
                self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
            self.skt.bind(address)
            if self.uopts.laddress[1] == 0:
                self.uopts.laddress = self.skt.getsockname()
        self.sendqueue = []
        self.stats = [0, 0, 0]
        self.wi_available = Condition()
        self.wi = []
        self.asenders = []
        self.areceivers = []
        if self.uopts.nworkers == None:
            nworkers = _DEFAULT_NWORKERS
        else:
            nworkers = self.uopts.nworkers
        for i in range(0, nworkers):
            self.asenders.append(AsyncSender(self))
            self.areceivers.append(AsyncReceiver(self)) 
开发者ID:sippy,项目名称:rtp_cluster,代码行数:32,代码来源:Udp_server.py


示例16: local4remote

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def local4remote(lookup_address, family = socket.AF_INET):
    skt = socket.socket(family, socket.SOCK_DGRAM)
    ai = socket.getaddrinfo(lookup_address, None, family)
    if family == socket.AF_INET:
        _address = (ai[0][4][0], 12345)
    else:
        _address = (ai[0][4][0], 12345, ai[0][4][2], ai[0][4][3])
    skt.connect(_address)
    if family == socket.AF_INET:
        laddress = skt.getsockname()[0]
    else:
        laddress = '[%s]' % skt.getsockname()[0]
    return laddress 
开发者ID:sippy,项目名称:rtp_cluster,代码行数:15,代码来源:misc.py


示例17: allowed_gai_family

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""
    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family 
开发者ID:danielecook,项目名称:gist-alfred,代码行数:11,代码来源:connection.py


示例18: create_connection

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT,
                      source_address=None):
    """Backport of 3-argument create_connection() for Py2.6.
    Connect to *address* and return the socket object.
    Convenience function.  Connect to *address* (a 2-tuple ``(host,
    port)``) and return the socket object.  Passing the optional
    *timeout* parameter will set the timeout on the socket instance
    before attempting to connect.  If no *timeout* is supplied, the
    global default timeout setting returned by :func:`getdefaulttimeout`
    is used.  If *source_address* is set it must be a tuple of (host, port)
    for the socket to bind as a source address before making the connection.
    An host of '' or port 0 tells the OS to use the default.
    """
    host, port = address
    err = None
    for res in getaddrinfo(host, port, 0, SOCK_STREAM):
        af, socktype, proto, canonname, sa = res
        sock = None
        try:
            sock = socket(af, socktype, proto)
            if timeout is not _GLOBAL_DEFAULT_TIMEOUT:
                sock.settimeout(timeout)
            if source_address:
                sock.bind(source_address)
            sock.connect(sa)
            return sock
        except error as _:
            err = _
            if sock is not None:
                sock.close()
    if err is not None:
        raise err
    else:
        raise error("getaddrinfo returns an empty list")
# Backport from Py2.7 for Py2.6: 
开发者ID:Soft8Soft,项目名称:verge3d-blender-addon,代码行数:43,代码来源:misc.py


示例19: find_free_address

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def find_free_address():
    """Bind to None, 0 to find an unused port on localhost (IPv4 or IPv6)
    :return:
    """
    err = None
    for info in socket.getaddrinfo(None, 0, socket.AF_UNSPEC,
                                   socket.SOCK_STREAM):
        family, stype, proto, _, addr = info
        sock = None
        try:
            sock = socket.socket(family, stype, proto)
            sock.bind(addr)
            if family == socket.AF_INET:
                return "{}:{}".format(*sock.getsockname())
            elif family == socket.AF_INET6:
                return "[{}]:{}".format(*sock.getsockname()[:2])
        except socket.error as e:
            err = e
        finally:
            if sock is not None:
                sock.close()
    if err is not None:
        raise err  # pylint: disable=raising-bad-type
    else:
        raise socket.error("getaddrinfo returns an empty list") 
开发者ID:latchset,项目名称:custodia,代码行数:28,代码来源:test_cli.py


示例20: create_socket_ipv6_rx_ucast

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def create_socket_ipv6_rx_ucast(self):
        if self._local_ipv6_address is None:
            self.warning("Could not create IPv6 UDP socket: don't have a local address")
            return None
        try:
            sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        except (IOError, OSError) as err:
            self.warning("Could not create IPv6 UDP socket: %s", err)
            return None
        self.enable_addr_and_port_reuse(sock)
        try:
            sockaddr = socket.getaddrinfo(self._local_ipv6_address,
                                          self._local_port,
                                          socket.AF_INET6,
                                          socket.SOCK_DGRAM)[0][4]
            sock.bind(sockaddr)
        except (IOError, OSError) as err:
            self.warning("Could not bind IPv6 UDP socket to address %s port %d: %s",
                         self._local_ipv6_address, self._local_port, err)
            return None
        try:
            sock.setblocking(0)
        except (IOError, OSError) as err:
            self.warning("Could set unicast receive IPv6 UDP to non-blocking mode: %s", err)
            return None
        return sock 
开发者ID:brunorijsman,项目名称:rift-python,代码行数:28,代码来源:udp_rx_handler.py


示例21: create_socket_ipv6_tx_mcast

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def create_socket_ipv6_tx_mcast(self, multicast_address, port, loopback):
        if self._interface_index is None:
            self.warning("Could not create IPv6 multicast TX socket: unknown interface index")
            return None
        try:
            sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        except IOError as err:
            self.warning("Could not create IPv6 UDP socket: %s", err)
            return None
        self.enable_addr_and_port_reuse(sock)
        try:
            sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, self._interface_index)
        except IOError as err:
            self.warning("Could not set IPv6 multicast interface index %d: %s",
                         self._interface_index, err)
            return None
        try:
            loop_value = 1 if loopback else 0
            sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, loop_value)
        except IOError as err:
            self.warning("Could not set IPv6 multicast loopback value %d: %s", loop_value, err)
            return None
        try:
            scoped_ipv6_multicast_address = (
                str(multicast_address) + '%' + self.physical_interface_name)
            sock_addr = socket.getaddrinfo(scoped_ipv6_multicast_address, port,
                                           socket.AF_INET6, socket.SOCK_DGRAM)[0][4]
            sock.connect(sock_addr)
        except (IOError, OSError) as err:
            self.warning("Could not connect UDP socket to address %s port %d: %s",
                         scoped_ipv6_multicast_address, port, err)
            return None
        return sock 
开发者ID:brunorijsman,项目名称:rift-python,代码行数:35,代码来源:interface.py


示例22: connect

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def connect(self, host=HOST, port=GPSD_PORT):
        """Connect to a host on a given port.
        Arguments:
            host: default host='127.0.0.1'
            port: default port=2947
        """
        for alotta_stuff in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
            family, socktype, proto, _canonname, host_port = alotta_stuff
            try:
                self.streamSock = socket.socket(family, socktype, proto)
                self.streamSock.connect(host_port)
                self.streamSock.setblocking(False)
            except (OSError, IOError) as error:
                sys.stderr.write('\r\nGPSDSocket.connect exception is--> {}'.format(error))
                sys.stderr.write('\r\nGPS3 gpsd connection at \'{0}\' on port \'{1}\' failed\r\n'.format(host, port)) 
开发者ID:wadda,项目名称:gps3,代码行数:17,代码来源:gps3.py


示例23: connect

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def connect(self, host=HOST, port=GPSD_PORT):
        """Connect to a host on a given port.
        Arguments:
            host: default host='127.0.0.1'
            port: default port=2947
        """
        for alotta_stuff in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
            family, socktype, proto, _canonname, host_port = alotta_stuff
            try:
                self.streamSock = socket.socket(family, socktype, proto)
                self.streamSock.connect(host_port)
                self.streamSock.setblocking(False)
            except (OSError, IOError) as error:
                sys.stderr.write(f'\r\nGPSDSocket.connect exception is--> {error}')
                sys.stderr.write(f'\r\nAGPS3 connection to gpsd at \'{host}\' on port \'{port}\' failed\r\n') 
开发者ID:wadda,项目名称:gps3,代码行数:17,代码来源:agps3.py


示例24: resolve

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def resolve(self, host, port, family=socket.AF_UNSPEC):
        # On Solaris, getaddrinfo fails if the given port is not found
        # in /etc/services and no socket type is given, so we must pass
        # one here.  The socket type used here doesn't seem to actually
        # matter (we discard the one we get back in the results),
        # so the addresses we return should still be usable with SOCK_DGRAM.
        addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM)
        results = []
        for family, socktype, proto, canonname, address in addrinfo:
            results.append((family, address))
        return results 
开发者ID:tao12345666333,项目名称:tornado-zh,代码行数:13,代码来源:netutil.py


示例25: _failing_getaddrinfo

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def _failing_getaddrinfo(*args):
    """Dummy implementation of getaddrinfo for use in mocks"""
    raise socket.gaierror("mock: lookup failed") 
开发者ID:tao12345666333,项目名称:tornado-zh,代码行数:5,代码来源:netutil_test.py


示例26: setUp

# 需要导入模块: import socket [as 别名]
# 或者: from socket import getaddrinfo [as 别名]
def setUp(self):
        super(BlockingResolverTest, self).setUp()
        self.resolver = BlockingResolver(io_loop=self.io_loop)
# getaddrinfo-based tests need mocking to reliably generate errors;
# some configurations are slow to produce errors and take longer than
# our default timeout. 
开发者ID:tao12345666333,项目名称:tornado-zh,代码行数:10,代码来源:netutil_test.py



注:本文中的socket.getaddrinfo方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。