Python socket.IPPROTO_IPV6属性代码示例

本文整理汇总了Python中socket.IPPROTO_IPV6属性的典型用法代码示例。如果您正苦于以下问题:Python socket.IPPROTO_IPV6属性的具体用法?Python socket.IPPROTO_IPV6怎么用?Python socket.IPPROTO_IPV6使用的例子?那么恭喜您, 这里精选的属性代码示例或许可以为您提供帮助。您也可以进一步了解该属性所在模块socket的用法示例。

在下文中一共展示了socket.IPPROTO_IPV6属性的21个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _create_ipv6_sockets

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def _create_ipv6_sockets(loopback_enabled):
    # Open a multicast send socket, with IP_MULTICAST_LOOP enabled or disabled as requested.
    intf_name = find_ethernet_interface()
    intf_index = socket.if_nametoindex(intf_name)
    mcast_address = "ff02::abcd:99"
    port = 30000
    group = (mcast_address, port)
    txsock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    txsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    txsock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, intf_index)
    if loopback_enabled:
        txsock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 1)
    else:
        txsock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 0)
    txsock.connect(group)
    # Open a multicast receive socket and join the group
    rxsock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    req = struct.pack("=16si", socket.inet_pton(socket.AF_INET6, mcast_address), intf_index)
    if platform.system() == "Darwin":
        rxsock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, req)
    else:
        rxsock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_ADD_MEMBERSHIP, req)
    rxsock.bind(("::", port))
    return (txsock, rxsock) 
开发者ID:brunorijsman,项目名称:rift-python,代码行数:26,代码来源:multicast_checks.py


示例2: bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        prevent_socket_inheritance(self.socket)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if self.nodelay and not isinstance(self.bind_addr, str):
            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        if self.ssl_adapter is not None:
            self.socket = self.ssl_adapter.bind(self.socket)
        # If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
        # activate dual-stack. See https://bitbucket.org/cherrypy/cherrypy/issue/871.
        if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
            and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
            try:
                self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            except (AttributeError, socket.error):
                # Apparently, the socket option is not available in
                # this machine's TCP stack
                pass
        self.socket.bind(self.bind_addr) 
开发者ID:exiahuang,项目名称:SalesforceXyTools,代码行数:25,代码来源:wsgiserver3.py


示例3: bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        prevent_socket_inheritance(self.socket)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if self.nodelay and not isinstance(self.bind_addr, str):
            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        if self.ssl_adapter is not None:
            self.socket = self.ssl_adapter.bind(self.socket)
        # If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
        # activate dual-stack. See http://www.cherrypy.org/ticket/871.
        if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
            and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
            try:
                self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            except (AttributeError, socket.error):
                # Apparently, the socket option is not available in
                # this machine's TCP stack
                pass
        self.socket.bind(self.bind_addr) 
开发者ID:joxeankoret,项目名称:nightmare,代码行数:25,代码来源:__init__.py


示例4: bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        prevent_socket_inheritance(self.socket)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if self.nodelay and not isinstance(self.bind_addr, str):
            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        if self.ssl_adapter is not None:
            self.socket = self.ssl_adapter.bind(self.socket)
        # If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
        # activate dual-stack. See
        # https://github.com/cherrypy/cherrypy/issues/871.
        if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
                and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
            try:
                self.socket.setsockopt(
                    socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            except (AttributeError, socket.error):
                # Apparently, the socket option is not available in
                # this machine's TCP stack
                pass
        self.socket.bind(self.bind_addr) 
开发者ID:Naayouu,项目名称:Hatkey,代码行数:27,代码来源:wsgiserver3.py


示例5: get_listen_ip

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def get_listen_ip():
    """ Get IP to listen on for things that don't natively support detecting IPv4/IPv6 dualstack """
    def has_dual_stack():
        if hasattr(socket, 'AF_INET6') and hasattr(socket, 'IPPROTO_IPV6') and hasattr(socket, 'IPV6_V6ONLY'):
            sock = None
            try:
                sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
                import urllib3
                return urllib3.util.connection.HAS_IPV6
            except socket.error as e:
                logging.debug('Error when working with ipv6 socket: %s', e)
            finally:
                if sock:
                    sock.close()
        return False
    info = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
    # in case dual stack is not supported we want IPv4 to be preferred over IPv6
    info.sort(key=lambda x: x[0] == socket.AF_INET, reverse=not has_dual_stack())
    return info[0][4][0] 
开发者ID:zalando,项目名称:spilo,代码行数:23,代码来源:configure_spilo.py


示例6: _init_advertiser

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def _init_advertiser(self):
        sock = _find_sock()
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, struct.pack('@I', 1))
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, struct.pack('@I', 1))
        if sock.family == socket.AF_INET6:
            sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, struct.pack('@I', 1))
        sock.bind(('', self.port))
        addrs = _resolve_addrs(self.addresses, None, self.ignore_unavailable, (sock.family,))
        for fam,to,orig_fam,orig_addr in addrs:
            try:
                _multicast_join_group(sock, orig_fam, orig_addr)
            except socket.error:
                if not self.ignore_unavailable:
                    raise
        self._sock = sock 
开发者ID:soravux,项目名称:scoop,代码行数:20,代码来源:minusconf.py


示例7: _testOddCmsgSize

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def _testOddCmsgSize(self):
        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
        try:
            nbytes = self.sendmsgToServer(
                [MSG],
                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
                  array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
                  array.array("i", [self.hop_limit]))])
        except OSError as e:
            self.assertIsInstance(e.errno, int)
            nbytes = self.sendmsgToServer(
                [MSG],
                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
                  array.array("i", [self.traffic_class])),
                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
                  array.array("i", [self.hop_limit]))])
            self.assertEqual(nbytes, len(MSG))
    # Tests for proper handling of truncated ancillary data 
开发者ID:Microvellum,项目名称:Fluid-Designer,代码行数:22,代码来源:test_socket.py


示例8: testSingleCmsgTruncInData

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def testSingleCmsgTruncInData(self):
        # Test truncation of a control message inside its associated
        # data.  The message may be returned with its data truncated,
        # or not returned at all.
        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
                                  socket.IPV6_RECVHOPLIMIT, 1)
        self.misc_event.set()
        msg, ancdata, flags, addr = self.doRecvmsg(
            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
        self.assertEqual(msg, MSG)
        self.checkRecvmsgAddress(addr, self.cli_addr)
        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
        self.assertLessEqual(len(ancdata), 1)
        if ancdata:
            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
            self.assertLess(len(cmsg_data), SIZEOF_INT) 
开发者ID:Microvellum,项目名称:Fluid-Designer,代码行数:22,代码来源:test_socket.py


示例9: has_dual_stack

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def has_dual_stack(sock=None):
    """Return True if kernel allows creating a socket which is able to
    listen for both IPv4 and IPv6 connections.
    If *sock* is provided the check is made against it.
    """
    try:
        socket.AF_INET6
        socket.IPPROTO_IPV6
        socket.IPV6_V6ONLY
    except AttributeError:
        return False
    try:
        if sock is not None:
            return not sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
        else:
            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            with contextlib.closing(sock):
                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
                return True
    except socket.error:
        return False 
开发者ID:ActiveState,项目名称:code,代码行数:23,代码来源:recipe-578504.py


示例10: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def init_func(self, creator_fd, address, wol_bind_ip, key, is_ipv6=False):
        self.__key = key
        self.__wol_bind_ip = wol_bind_ip
        if is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        s = socket.socket(fa, socket.SOCK_STREAM)
        if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.set_socket(s)
        self.bind(address)
        self.listen(10)
        self.register(self.fileno)
        self.add_evt_read(self.fileno)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:22,代码来源:wol_handler.py


示例11: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def init_func(self, creator_fd, address, auth_id, remote_info, is_ipv6=False):
        self.__is_ipv6 = is_ipv6
        self.__remote_info = remote_info
        self.__auth_id = auth_id
        if is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        s = socket.socket(fa, socket.SOCK_STREAM)
        if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        self.set_socket(s)
        self.bind(address)
        self.listen(10)
        self.register(self.fileno)
        self.add_evt_read(self.fileno)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:24,代码来源:WANd_raw.py


示例12: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def init_func(self, creator, address, crypto, crypto_configs, conn_timeout=800, is_ipv6=False, over_http=False):
        self.__crypto_configs = crypto_configs
        self.__crypto = crypto
        self.__conn_timeout = conn_timeout
        self.__over_http = over_http
        if is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        s = socket.socket(fa, socket.SOCK_STREAM)
        if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.set_socket(s)
        self.bind(address)
        self.listen(10)
        self.register(self.fileno)
        self.add_evt_read(self.fileno)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:24,代码来源:tunnels.py


示例13: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def init_func(self, creator_fd, address, redirect_address, listen_is_ipv6=False, redirect_is_ipv6=False):
        if listen_is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        s = socket.socket(fa, socket.SOCK_STREAM)
        if listen_is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.__redirect_is_ipv6 = redirect_is_ipv6
        self.__redirect_address = redirect_address
        self.set_socket(s)
        self.bind(address)
        self.listen(10)
        self.register(self.fileno)
        self.add_evt_read(self.fileno)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:22,代码来源:relay.py


示例14: bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        prevent_socket_inheritance(self.socket)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if self.nodelay and not isinstance(self.bind_addr, str):
            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        if self.ssl_adapter is not None:
            self.socket = self.ssl_adapter.bind(self.socket)
        # If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
        # activate dual-stack. See
        # https://bitbucket.org/cherrypy/cherrypy/issue/871.
        if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
                and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
            try:
                self.socket.setsockopt(
                    socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            except (AttributeError, socket.error):
                # Apparently, the socket option is not available in
                # this machine's TCP stack
                pass
        self.socket.bind(self.bind_addr) 
开发者ID:naparuba,项目名称:opsbro,代码行数:27,代码来源:wsgiserver3.py


示例15: create_socket_ipv6_tx_mcast

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def create_socket_ipv6_tx_mcast(self, multicast_address, port, loopback):
        if self._interface_index is None:
            self.warning("Could not create IPv6 multicast TX socket: unknown interface index")
            return None
        try:
            sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        except IOError as err:
            self.warning("Could not create IPv6 UDP socket: %s", err)
            return None
        self.enable_addr_and_port_reuse(sock)
        try:
            sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, self._interface_index)
        except IOError as err:
            self.warning("Could not set IPv6 multicast interface index %d: %s",
                         self._interface_index, err)
            return None
        try:
            loop_value = 1 if loopback else 0
            sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, loop_value)
        except IOError as err:
            self.warning("Could not set IPv6 multicast loopback value %d: %s", loop_value, err)
            return None
        try:
            scoped_ipv6_multicast_address = (
                str(multicast_address) + '%' + self.physical_interface_name)
            sock_addr = socket.getaddrinfo(scoped_ipv6_multicast_address, port,
                                           socket.AF_INET6, socket.SOCK_DGRAM)[0][4]
            sock.connect(sock_addr)
        except (IOError, OSError) as err:
            self.warning("Could not connect UDP socket to address %s port %d: %s",
                         scoped_ipv6_multicast_address, port, err)
            return None
        return sock 
开发者ID:brunorijsman,项目名称:rift-python,代码行数:35,代码来源:interface.py


示例16: _recv

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def _recv(self, p, x=MTU):
    if p is None:
      return p
    elif isinstance(p, IP):
      # TODO: verify checksum
      if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
        if isinstance(p.payload, IPv6):
          return p.payload
    return p 
开发者ID:medbenali,项目名称:CyberScan,代码行数:11,代码来源:inet6.py


示例17: send

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def send(self, x):
    return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6)/x)
#############################################################################
#############################################################################
###                          Layers binding                               ###
#############################################################################
############################################################################# 
开发者ID:medbenali,项目名称:CyberScan,代码行数:11,代码来源:inet6.py


示例18: create_tcp_socket

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def create_tcp_socket():
    """Create a TCP socket with or without IPv6 depending on system support"""
    if has_ipv6:
        sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
        # Explicitly configure socket to work for both IPv4 and IPv6
        if hasattr(socket, "IPPROTO_IPV6"):
            sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
        elif sys.platform == "win32":  # also match 64bit windows.
            # Python 2.7 on windows does not have the IPPROTO_IPV6 constant
            # Use values extracted from Windows Vista/7/8's header
            sock.setsockopt(41, 27, 0)
    else:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    return sock 
开发者ID:mopidy,项目名称:mopidy-mpd,代码行数:17,代码来源:network.py


示例19: _support_hybrid_ipv6

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def _support_hybrid_ipv6():
    """Return True if it is possible to use hybrid IPv6/IPv4 sockets
    on this platform.
    """
    # Note: IPPROTO_IPV6 constant is broken on Windows, see:
    # http://bugs.python.org/issue6926
    try:
        if not socket.has_ipv6:
            return False
        with contextlib.closing(socket.socket(socket.AF_INET6)) as sock:
            return not sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
    except (socket.error, AttributeError):
        return False 
开发者ID:aliyun,项目名称:oss-ftp,代码行数:15,代码来源:handlers.py


示例20: main

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def main():
    addr = config.tun_addr or config.uri_addr
    port = config.tun_port or config.uri_port
    try:
        c = loop.create_server_tfo if config.tfo else loop.create_server
        server = loop.run_until_complete(c(factory, addr, port))
    except OSError:
        die('wstan server failed to bind on %s:%d' % (addr, port))
    so = server.sockets[0]
    if len(server.sockets) == 1 and so.family == socket.AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
        # force user to specify URI in wstan server is a bad design, this try to fix
        # inconvenience in dual stack server
        so.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)  # default 1 in Linux
    loop.set_exception_handler(silent_timeout_err_handler)
    async_(clean_seen_nonce())
    print('wstan server -- listening on %s:%d' % (addr, port))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        server.close()
        loop.close() 
开发者ID:krrr,项目名称:wstan,代码行数:28,代码来源:server.py


示例21: close

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPPROTO_IPV6 [as 别名]
def close(self):
        """Ends the background threads, and prevent this instance from
        servicing further queries."""
        if not self._GLOBAL_DONE:
            self._GLOBAL_DONE = True
            # remove service listeners
            self.remove_all_service_listeners()
            self.unregister_all_services()
            # shutdown recv socket and thread
            self.engine.del_reader(self._listen_socket)
            if self.address_family == socket.AF_INET:
                self._listen_socket.setsockopt(socket.SOL_IP, socket.IP_DROP_MEMBERSHIP,
                    socket.inet_aton(_MDNS_ADDR) + socket.inet_aton('0.0.0.0'))
            else:
                group = socket.inet_pton(socket.AF_INET6,_MDNS_ADDR_IPV6) + self.ifn
                self._listen_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_LEAVE_GROUP,group)
            self._listen_socket.close()
            self.engine.join()
            # shutdown the rest
            self.notify_all()
            self.reaper.join()
            for s in self._respond_sockets:
                s.close() 
开发者ID:hexway,项目名称:apple_bleee,代码行数:29,代码来源:zeroconf.py



注:本文中的socket.IPPROTO_IPV6属性示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。