Python socket.IPV6_V6ONLY属性代码示例

本文整理汇总了Python中socket.IPV6_V6ONLY属性的典型用法代码示例。如果您正苦于以下问题:Python socket.IPV6_V6ONLY属性的具体用法?Python socket.IPV6_V6ONLY怎么用?Python socket.IPV6_V6ONLY使用的例子?那么恭喜您, 这里精选的属性代码示例或许可以为您提供帮助。您也可以进一步了解该属性所在模块socket的用法示例。

在下文中一共展示了socket.IPV6_V6ONLY属性的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        prevent_socket_inheritance(self.socket)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if self.nodelay and not isinstance(self.bind_addr, str):
            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        if self.ssl_adapter is not None:
            self.socket = self.ssl_adapter.bind(self.socket)
        # If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
        # activate dual-stack. See https://bitbucket.org/cherrypy/cherrypy/issue/871.
        if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
            and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
            try:
                self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            except (AttributeError, socket.error):
                # Apparently, the socket option is not available in
                # this machine's TCP stack
                pass
        self.socket.bind(self.bind_addr) 
开发者ID:exiahuang,项目名称:SalesforceXyTools,代码行数:25,代码来源:wsgiserver3.py


示例2: bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        prevent_socket_inheritance(self.socket)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if self.nodelay and not isinstance(self.bind_addr, str):
            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        if self.ssl_adapter is not None:
            self.socket = self.ssl_adapter.bind(self.socket)
        # If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
        # activate dual-stack. See http://www.cherrypy.org/ticket/871.
        if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
            and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
            try:
                self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            except (AttributeError, socket.error):
                # Apparently, the socket option is not available in
                # this machine's TCP stack
                pass
        self.socket.bind(self.bind_addr) 
开发者ID:joxeankoret,项目名称:nightmare,代码行数:25,代码来源:__init__.py


示例3: bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        prevent_socket_inheritance(self.socket)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if self.nodelay and not isinstance(self.bind_addr, str):
            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        if self.ssl_adapter is not None:
            self.socket = self.ssl_adapter.bind(self.socket)
        # If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
        # activate dual-stack. See
        # https://github.com/cherrypy/cherrypy/issues/871.
        if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
                and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
            try:
                self.socket.setsockopt(
                    socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            except (AttributeError, socket.error):
                # Apparently, the socket option is not available in
                # this machine's TCP stack
                pass
        self.socket.bind(self.bind_addr) 
开发者ID:Naayouu,项目名称:Hatkey,代码行数:27,代码来源:wsgiserver3.py


示例4: get_listen_ip

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def get_listen_ip():
    """ Get IP to listen on for things that don't natively support detecting IPv4/IPv6 dualstack """
    def has_dual_stack():
        if hasattr(socket, 'AF_INET6') and hasattr(socket, 'IPPROTO_IPV6') and hasattr(socket, 'IPV6_V6ONLY'):
            sock = None
            try:
                sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
                import urllib3
                return urllib3.util.connection.HAS_IPV6
            except socket.error as e:
                logging.debug('Error when working with ipv6 socket: %s', e)
            finally:
                if sock:
                    sock.close()
        return False
    info = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
    # in case dual stack is not supported we want IPv4 to be preferred over IPv6
    info.sort(key=lambda x: x[0] == socket.AF_INET, reverse=not has_dual_stack())
    return info[0][4][0] 
开发者ID:zalando,项目名称:spilo,代码行数:23,代码来源:configure_spilo.py


示例5: has_dual_stack

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def has_dual_stack(sock=None):
    """Return True if kernel allows creating a socket which is able to
    listen for both IPv4 and IPv6 connections.
    If *sock* is provided the check is made against it.
    """
    try:
        socket.AF_INET6
        socket.IPPROTO_IPV6
        socket.IPV6_V6ONLY
    except AttributeError:
        return False
    try:
        if sock is not None:
            return not sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
        else:
            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            with contextlib.closing(sock):
                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
                return True
    except socket.error:
        return False 
开发者ID:ActiveState,项目名称:code,代码行数:23,代码来源:recipe-578504.py


示例6: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def init_func(self, creator_fd, address, wol_bind_ip, key, is_ipv6=False):
        self.__key = key
        self.__wol_bind_ip = wol_bind_ip
        if is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        s = socket.socket(fa, socket.SOCK_STREAM)
        if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.set_socket(s)
        self.bind(address)
        self.listen(10)
        self.register(self.fileno)
        self.add_evt_read(self.fileno)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:22,代码来源:wol_handler.py


示例7: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def init_func(self, creator_fd, address, auth_id, remote_info, is_ipv6=False):
        self.__is_ipv6 = is_ipv6
        self.__remote_info = remote_info
        self.__auth_id = auth_id
        if is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        s = socket.socket(fa, socket.SOCK_STREAM)
        if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        self.set_socket(s)
        self.bind(address)
        self.listen(10)
        self.register(self.fileno)
        self.add_evt_read(self.fileno)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:24,代码来源:WANd_raw.py


示例8: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def init_func(self, creator, address, crypto, crypto_configs, conn_timeout=800, is_ipv6=False, over_http=False):
        self.__crypto_configs = crypto_configs
        self.__crypto = crypto
        self.__conn_timeout = conn_timeout
        self.__over_http = over_http
        if is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        s = socket.socket(fa, socket.SOCK_STREAM)
        if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.set_socket(s)
        self.bind(address)
        self.listen(10)
        self.register(self.fileno)
        self.add_evt_read(self.fileno)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:24,代码来源:tunnels.py


示例9: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def init_func(self, creator_fd, address, redirect_address, listen_is_ipv6=False, redirect_is_ipv6=False):
        if listen_is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        s = socket.socket(fa, socket.SOCK_STREAM)
        if listen_is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.__redirect_is_ipv6 = redirect_is_ipv6
        self.__redirect_address = redirect_address
        self.set_socket(s)
        self.bind(address)
        self.listen(10)
        self.register(self.fileno)
        self.add_evt_read(self.fileno)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:22,代码来源:relay.py


示例10: bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        prevent_socket_inheritance(self.socket)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if self.nodelay and not isinstance(self.bind_addr, str):
            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        if self.ssl_adapter is not None:
            self.socket = self.ssl_adapter.bind(self.socket)
        # If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
        # activate dual-stack. See
        # https://bitbucket.org/cherrypy/cherrypy/issue/871.
        if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
                and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
            try:
                self.socket.setsockopt(
                    socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            except (AttributeError, socket.error):
                # Apparently, the socket option is not available in
                # this machine's TCP stack
                pass
        self.socket.bind(self.bind_addr) 
开发者ID:naparuba,项目名称:opsbro,代码行数:27,代码来源:wsgiserver3.py


示例11: create_tcp_socket

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def create_tcp_socket():
    """Create a TCP socket with or without IPv6 depending on system support"""
    if has_ipv6:
        sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
        # Explicitly configure socket to work for both IPv4 and IPv6
        if hasattr(socket, "IPPROTO_IPV6"):
            sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
        elif sys.platform == "win32":  # also match 64bit windows.
            # Python 2.7 on windows does not have the IPPROTO_IPV6 constant
            # Use values extracted from Windows Vista/7/8's header
            sock.setsockopt(41, 27, 0)
    else:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    return sock 
开发者ID:mopidy,项目名称:mopidy-mpd,代码行数:17,代码来源:network.py


示例12: _support_hybrid_ipv6

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def _support_hybrid_ipv6():
    """Return True if it is possible to use hybrid IPv6/IPv4 sockets
    on this platform.
    """
    # Note: IPPROTO_IPV6 constant is broken on Windows, see:
    # http://bugs.python.org/issue6926
    try:
        if not socket.has_ipv6:
            return False
        with contextlib.closing(socket.socket(socket.AF_INET6)) as sock:
            return not sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
    except (socket.error, AttributeError):
        return False 
开发者ID:aliyun,项目名称:oss-ftp,代码行数:15,代码来源:handlers.py


示例13: main

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def main():
    addr = config.tun_addr or config.uri_addr
    port = config.tun_port or config.uri_port
    try:
        c = loop.create_server_tfo if config.tfo else loop.create_server
        server = loop.run_until_complete(c(factory, addr, port))
    except OSError:
        die('wstan server failed to bind on %s:%d' % (addr, port))
    so = server.sockets[0]
    if len(server.sockets) == 1 and so.family == socket.AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
        # force user to specify URI in wstan server is a bad design, this try to fix
        # inconvenience in dual stack server
        so.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)  # default 1 in Linux
    loop.set_exception_handler(silent_timeout_err_handler)
    async_(clean_seen_nonce())
    print('wstan server -- listening on %s:%d' % (addr, port))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        server.close()
        loop.close() 
开发者ID:krrr,项目名称:wstan,代码行数:28,代码来源:server.py


示例14: server_bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def server_bind(self):
        if self.address_family == socket.AF_INET6:
            # Only allow IPv6 connections to the IPv6 socket
            self.socket.setsockopt(COMPAT_IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        super().server_bind() 
开发者ID:etesync,项目名称:etesync-dav,代码行数:7,代码来源:server.py


示例15: _listen_tcp

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def _listen_tcp(self, loc_addr, conn_handle):
        """Creates a TCP server socket which listens on `port` number.
        For each connection `server_factory` starts a new protocol.
        """
        info = socket.getaddrinfo(None, loc_addr[1], socket.AF_UNSPEC,
                                  socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
        listen_sockets = {}
        for res in info:
            af, socktype, proto, cannonname, sa = res
            sock = None
            try:
                sock = socket.socket(af, socktype, proto)
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                if af == socket.AF_INET6:
                    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
                sock.bind(sa)
                sock.listen(50)
                listen_sockets[sa] = sock
            except socket.error:
                if sock:
                    sock.close()
        count = 0
        server = None
        for sa in listen_sockets.keys():
            name = self.name + '_server@' + str(sa[0])
            self._asso_socket_map[name] = listen_sockets[sa]
            if count == 0:
                import eventlet
                server = eventlet.spawn(self._listen_socket_loop,
                                        listen_sockets[sa], conn_handle)
                self._child_thread_map[name] = server
                count += 1
            else:
                server = self._spawn(name, self._listen_socket_loop,
                                     listen_sockets[sa], conn_handle)
        return server, listen_sockets 
开发者ID:OpenState-SDN,项目名称:ryu,代码行数:42,代码来源:base.py


示例16: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def init_func(self, creator, listen, is_ipv6=False):
        if is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        s = socket.socket(fa, socket.SOCK_STREAM)
        if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.set_socket(s)
        self.bind(listen)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:16,代码来源:websocket.py


示例17: init_func

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def init_func(self, creator, address, debug=False, server_side=False, is_ipv6=False):
        if is_ipv6:
            fa = socket.AF_INET6
        else:
            fa = socket.AF_INET
        self.__is_ipv6 = is_ipv6
        s = socket.socket(fa, socket.SOCK_DGRAM)
        if server_side and is_ipv6:
            s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        self.set_socket(s)
        self.__server_side = server_side
        if server_side:
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.bind((address, 53))
        else:
            self.connect((address, 53))
        self.__debug = debug
        self.__timer = timer.timer()
        self.__ip_match = ip_match.ip_match()
        self.__host_match = host_match.host_match()
        self.set_timeout(self.fileno, self.__LOOP_TIMEOUT)
        self.register(self.fileno)
        self.add_evt_read(self.fileno)
        return self.fileno 
开发者ID:fdslight,项目名称:fdslight,代码行数:34,代码来源:dns_proxy.py


示例18: server_bind

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def server_bind(self):
        """Override to enable IPV4 mapping for IPV6 sockets when desired.
        The main use case for this is so that when no host is specified,
        TensorBoard can listen on all interfaces for both IPv4 and IPv6
        connections, rather than having to choose v4 or v6 and hope the
        browser didn't choose the other one.
        """
        socket_is_v6 = (
            hasattr(socket, "AF_INET6")
            and self.socket.family == socket.AF_INET6
        )
        has_v6only_option = hasattr(socket, "IPPROTO_IPV6") and hasattr(
            socket, "IPV6_V6ONLY"
        )
        if self._auto_wildcard and socket_is_v6 and has_v6only_option:
            try:
                self.socket.setsockopt(
                    socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0
                )
            except socket.error as e:
                # Log a warning on failure to dual-bind, except for EAFNOSUPPORT
                # since that's expected if IPv4 isn't supported at all (IPv6-only).
                if (
                    hasattr(errno, "EAFNOSUPPORT")
                    and e.errno != errno.EAFNOSUPPORT
                ):
                    logger.warning(
                        "Failed to dual-bind to IPv4 wildcard: %s", str(e)
                    )
        super(WerkzeugServer, self).server_bind() 
开发者ID:tensorflow,项目名称:tensorboard,代码行数:33,代码来源:program.py


示例19: init_v6

# 需要导入模块: import socket [as 别名]
# 或者: from socket import IPV6_V6ONLY [as 别名]
def init_v6(self):
        idx = socket.if_nametoindex(self.interface.name)
        self.multicast_address = (WSD_MCAST_GRP_V6, WSD_UDP_PORT, 0x575C, idx)
        # v6: member_request = { multicast_addr, intf_idx }
        mreq = (
            socket.inet_pton(self.family, WSD_MCAST_GRP_V6) +
            struct.pack('@I', idx))
        self.recv_socket.setsockopt(
            socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
        self.recv_socket.setsockopt(
            socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
        # bind to network interface, i.e. scope and handle OS differences,
        # see Stevens: Unix Network Programming, Section 21.6, last paragraph
        try:
            self.recv_socket.bind((WSD_MCAST_GRP_V6, WSD_UDP_PORT, 0, idx))
        except OSError:
            self.recv_socket.bind(('::', 0, 0, idx))
        self.send_socket.setsockopt(
            socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 0)
        self.send_socket.setsockopt(
            socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, args.hoplimit)
        self.send_socket.setsockopt(
            socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, idx)
        self.transport_address = '[{0}]'.format(self.address)
        self.listen_address = (self.address, WSD_HTTP_PORT, 0, idx) 
开发者ID:christgau,项目名称:wsdd,代码行数:31,代码来源:wsdd.py



注:本文中的socket.IPV6_V6ONLY属性示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。