Python socket.socketpair方法代码示例

本文整理汇总了Python中socket.socketpair方法的典型用法代码示例。如果您正苦于以下问题:Python socket.socketpair方法的具体用法?Python socket.socketpair怎么用?Python socket.socketpair使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块socket的用法示例。

在下文中一共展示了socket.socketpair方法的21个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def __init__(self, app, QAbstractSocket):
        self._app = app
        self.old_fd = None
        # Create a socket pair
        self.wsock, self.rsock = socketpair(type=SOCK_DGRAM)
        self.socket = QAbstractSocket(QAbstractSocket.UdpSocket, app)
        # Let Qt listen on the one end
        self.socket.setSocketDescriptor(self.rsock.fileno())
        # And let Python write on the other end
        self.wsock.setblocking(False)
        self.old_fd = signal.set_wakeup_fd(self.wsock.fileno())
        # First Python code executed gets any exception from
        # the signal handler, so add a dummy handler first
        self.socket.readyRead.connect(lambda : None)
        # Second handler does the real handling
        self.socket.readyRead.connect(self._readSignal) 
开发者ID:mherrmann,项目名称:fbs,代码行数:18,代码来源:_signal.py


示例2: test_read_while_writeable

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def test_read_while_writeable(self):
        # Ensure that write events don't come in while we're waiting for
        # a read and haven't asked for writeability. (the reverse is
        # difficult to test for)
        client, server = socket.socketpair()
        try:
            def handler(fd, events):
                self.assertEqual(events, IOLoop.READ)
                self.stop()
            self.io_loop.add_handler(client.fileno(), handler, IOLoop.READ)
            self.io_loop.add_timeout(self.io_loop.time() + 0.01,
                                     functools.partial(server.send, b'asdf'))
            self.wait()
            self.io_loop.remove_handler(client.fileno())
        finally:
            client.close()
            server.close() 
开发者ID:tao12345666333,项目名称:tornado-zh,代码行数:19,代码来源:ioloop_test.py


示例3: testDaemonConnectedSocket

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def testDaemonConnectedSocket(self):
        try:
            Pyro5.config.SERVERTYPE = "thread"
            with Pyro5.server.Daemon() as d:
                assert "Thread" in d.transportServer.__class__.__name__
            s1, s2 = socket.socketpair()
            with Pyro5.server.Daemon(connected_socket=s1) as d:
                assert d.locationStr=="./u:<<not-bound>>" or d.locationStr.startswith("127.0.")
                assert not("Thread" in d.transportServer.__class__.__name__)
                assert "Existing" in d.transportServer.__class__.__name__
            Pyro5.config.SERVERTYPE = "multiplex"
            with Pyro5.server.Daemon() as d:
                assert "Multiplex" in d.transportServer.__class__.__name__
            s1, s2 = socket.socketpair()
            with Pyro5.server.Daemon(connected_socket=s1) as d:
                assert d.locationStr=="./u:<<not-bound>>" or d.locationStr.startswith("127.0.")
                assert not("Multiplex" in d.transportServer.__class__.__name__)
                assert "Existing" in d.transportServer.__class__.__name__
        finally:
            Pyro5.config.SERVERTYPE = "thread" 
开发者ID:irmen,项目名称:Pyro5,代码行数:22,代码来源:test_daemon.py


示例4: test_issue30058

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def test_issue30058(self):
        # changelist must be an iterable
        kq = select.kqueue()
        a, b = socket.socketpair()
        ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        kq.control([ev], 0)
        # not a list
        kq.control((ev,), 0)
        # __len__ is not consistent with __iter__
        class BadList:
            def __len__(self):
                return 0
            def __iter__(self):
                for i in range(100):
                    yield ev
        kq.control(BadList(), 0)
        # doesn't have __len__
        kq.control(iter([ev]), 0)
        a.close()
        b.close()
        kq.close() 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:25,代码来源:test_kqueue.py


示例5: Pipe

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)
        return c1, c2 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:20,代码来源:connection.py


示例6: __init__

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def __init__(self,
                 processor,
                 lsocket,
                 inputProtocolFactory=None,
                 outputProtocolFactory=None,
                 threads=10):
        self.processor = processor
        self.socket = lsocket
        self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
        self.out_protocol = outputProtocolFactory or self.in_protocol
        self.threads = int(threads)
        self.clients = {}
        self.tasks = Queue.Queue()
        self._read, self._write = socket.socketpair()
        self.prepared = False
        self._stop = False 
开发者ID:XiaoMi,项目名称:galaxy-sdk-python,代码行数:18,代码来源:TNonblockingServer.py


示例7: test_read_while_writeable

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def test_read_while_writeable(self):
        # Ensure that write events don't come in while we're waiting for
        # a read and haven't asked for writeability. (the reverse is
        # difficult to test for)
        client, server = socket.socketpair()
        try:
            def handler(fd, events):
                self.assertEqual(events, IOLoop.READ)
                self.stop()
            self.io_loop.add_handler(client.fileno(), handler, IOLoop.READ)
            self.io_loop.add_timeout(
                self.io_loop.time() + 0.01, functools.partial(server.send, b"asdf")
            )
            self.wait()
            self.io_loop.remove_handler(client.fileno())
        finally:
            client.close()
            server.close() 
开发者ID:opendevops-cn,项目名称:opendevops,代码行数:22,代码来源:ioloop_test.py


示例8: socketpair

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
        with contextlib.closing(socket.socket(family, type, proto)) as l:
            l.bind(("localhost", 0))
            l.listen()
            c = socket.socket(family, type, proto)
            try:
                c.connect(l.getsockname())
                caddr = c.getsockname()
                while True:
                    a, addr = l.accept()
                    # check that we've got the correct client
                    if addr == caddr:
                        return c, a
                    a.close()
            except OSError:
                c.close()
                raise
# TODO: write more tests. 
开发者ID:aliyun,项目名称:oss-ftp,代码行数:22,代码来源:test_ioloop.py


示例9: test__copy_eof_on_all

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)
        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)
        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)
        with self.assertRaises(IndexError):
            pty._copy(masters[0]) 
开发者ID:aliyun,项目名称:oss-ftp,代码行数:26,代码来源:test_pty.py


示例10: __init__

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def __init__(self,
                 processor,
                 lsocket,
                 inputProtocolFactory=None,
                 outputProtocolFactory=None,
                 threads=10):
        self.processor = processor
        self.socket = lsocket
        self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
        self.out_protocol = outputProtocolFactory or self.in_protocol
        self.threads = int(threads)
        self.clients = {}
        self.tasks = queue.Queue()
        self._read, self._write = socket.socketpair()
        self.prepared = False
        self._stop = False 
开发者ID:Aditmadzs,项目名称:Protect4,代码行数:18,代码来源:TNonblockingServer.py


示例11: test_client_destroy_listener

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def test_client_destroy_listener():
    global a, b
    s1, s2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)
    a = 0
    b = 0
    display = Display()
    client = Client(display, s1.fileno())
    destroy_listener_a = Listener(destroy_notify_a)
    destroy_listener_b = Listener(destroy_notify_b)
    client.add_destroy_listener(destroy_listener_a)
    client.add_destroy_listener(destroy_listener_b)
    destroy_listener_a.remove()
    client.destroy()
    assert a == 0
    assert b == 1 
开发者ID:flacjacket,项目名称:pywayland,代码行数:23,代码来源:test_client_destroy.py


示例12: test_create_resource

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def test_create_resource():
    s1, s2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)
    display = Display()
    client = Client(display, s1.fileno())
    # Create resource
    res = WlDisplay.resource_class(client, version=4)
    assert res.version == 4
    # Fetching the client object by id gives the resource back again
    assert client.get_object(res.id) == res
    client.user_data = 0xbee
    assert client.user_data == 0xbee
    client.destroy()
    display.destroy()
    s2.close() 
开发者ID:flacjacket,项目名称:pywayland,代码行数:22,代码来源:test_resource.py


示例13: notest_create_resource_with_same_id

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def notest_create_resource_with_same_id():
    s1, s2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)
    display = Display()
    client = Client(display, s1.fileno())
    # Create resource
    res = WlDisplay.resource_class(client, version=2)
    assert client.get_object(res.id) == res
    # This should replace the old one
    res2 = WlDisplay.resource_class(client, version=1, id=res.id)
    assert client.get_object(res.id) == res2
    res2.destroy()
    res.destroy()
    client.destroy()
    display.destroy()
    s2.close() 
开发者ID:flacjacket,项目名称:pywayland,代码行数:22,代码来源:test_resource.py


示例14: _start

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
        stdin_w = None
        if stdin == subprocess.PIPE:
            # Use a socket pair for stdin, since not all platforms
            # support selecting read events on the write end of a
            # socket (which we use in order to detect closing of the
            # other end).  Notably this is needed on AIX, and works
            # just fine on other platforms.
            stdin, stdin_w = self._loop._socketpair()
            # Mark the write end of the stdin pipe as non-inheritable,
            # needed by close_fds=False on Python 3.3 and older
            # (Python 3.4 implements the PEP 446, socketpair returns
            # non-inheritable sockets)
            _set_inheritable(stdin_w.fileno(), False)
        self._proc = subprocess.Popen(
            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
            universal_newlines=False, bufsize=bufsize, **kwargs)
        if stdin_w is not None:
            stdin.close()
            self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 
开发者ID:Microvellum,项目名称:Fluid-Designer,代码行数:23,代码来源:unix_events.py


示例15: test__copy_eof_on_all

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = self._socketpair()
        masters = [s.fileno() for s in socketpair]
        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)
        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)
        with self.assertRaises(IndexError):
            pty._copy(masters[0]) 
开发者ID:Microvellum,项目名称:Fluid-Designer,代码行数:25,代码来源:test_pty.py


示例16: socketpair

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
        with socket.socket(family, type, proto) as l:
            l.bind((support.HOST, 0))
            l.listen()
            c = socket.socket(family, type, proto)
            try:
                c.connect(l.getsockname())
                caddr = c.getsockname()
                while True:
                    a, addr = l.accept()
                    # check that we've got the correct client
                    if addr == caddr:
                        return c, a
                    a.close()
            except OSError:
                c.close()
                raise 
开发者ID:Microvellum,项目名称:Fluid-Designer,代码行数:19,代码来源:test_selectors.py


示例17: Pipe

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = Connection(s1.detach())
            c2 = Connection(s2.detach())
        else:
            fd1, fd2 = os.pipe()
            c1 = Connection(fd1, writable=False)
            c2 = Connection(fd2, readable=False)
        return c1, c2 
开发者ID:Microvellum,项目名称:Fluid-Designer,代码行数:18,代码来源:connection.py


示例18: test_remove_handler_from_handler

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def test_remove_handler_from_handler(self):
        # Create two sockets with simultaneous read events.
        client, server = socket.socketpair()
        try:
            client.send(b'abc')
            server.send(b'abc')
            # After reading from one fd, remove the other from the IOLoop.
            chunks = []
            def handle_read(fd, events):
                chunks.append(fd.recv(1024))
                if fd is client:
                    self.io_loop.remove_handler(server)
                else:
                    self.io_loop.remove_handler(client)
            self.io_loop.add_handler(client, handle_read, self.io_loop.READ)
            self.io_loop.add_handler(server, handle_read, self.io_loop.READ)
            self.io_loop.call_later(0.03, self.stop)
            self.wait()
            # Only one fd was read; the other was cleanly removed.
            self.assertEqual(chunks, [b'abc'])
        finally:
            client.close()
            server.close()
# Deliberately not a subclass of AsyncTestCase so the IOLoop isn't
# automatically set as current. 
开发者ID:tao12345666333,项目名称:tornado-zh,代码行数:32,代码来源:ioloop_test.py


示例19: setUp

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def setUp(self):
        self.serv, self.cli = socket.socketpair() 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:4,代码来源:test_socket.py


示例20: check_sendall_interrupted

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def check_sendall_interrupted(self, with_timeout):
        # socketpair() is not strictly required, but it makes things easier.
        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
            self.skipTest("signal.alarm and socket.socketpair required for this test")
        # Our signal handlers clobber the C errno by calling a math function
        # with an invalid domain value.
        def ok_handler(*args):
            self.assertRaises(ValueError, math.acosh, 0)
        def raising_handler(*args):
            self.assertRaises(ValueError, math.acosh, 0)
            1 // 0
        c, s = socket.socketpair()
        old_alarm = signal.signal(signal.SIGALRM, raising_handler)
        try:
            if with_timeout:
                # Just above the one second minimum for signal.alarm
                c.settimeout(1.5)
            with self.assertRaises(ZeroDivisionError):
                signal.alarm(1)
                c.sendall(b"x" * test_support.SOCK_MAX_SIZE)
            if with_timeout:
                signal.signal(signal.SIGALRM, ok_handler)
                signal.alarm(1)
                self.assertRaises(socket.timeout, c.sendall,
                                  b"x" * test_support.SOCK_MAX_SIZE)
        finally:
            signal.alarm(0)
            signal.signal(signal.SIGALRM, old_alarm)
            c.close()
            s.close() 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:32,代码来源:test_socket.py


示例21: testPair

# 需要导入模块: import socket [as 别名]
# 或者: from socket import socketpair [as 别名]
def testPair(self):
        kq = select.kqueue()
        a, b = socket.socketpair()
        a.send(b'foo')
        event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        r = kq.control([event1, event2], 1, 1)
        self.assertTrue(r)
        self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
        self.assertEqual(b.recv(r[0].data), b'foo')
        a.close()
        b.close()
        kq.close() 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:17,代码来源:test_kqueue.py



注:本文中的socket.socketpair方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。