Python socket.ntohl方法代码示例

本文整理汇总了Python中socket.ntohl方法的典型用法代码示例。如果您正苦于以下问题:Python socket.ntohl方法的具体用法?Python socket.ntohl怎么用?Python socket.ntohl使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块socket的用法示例。

在下文中一共展示了socket.ntohl方法的21个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_netmask

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def get_netmask(self):
        """
        Get ip network netmask
        """
        if not CTYPES_SUPPORT:
            raise exceptions.TestSkipError("Getting the netmask requires "
                                           "python > 2.4")
        ifreq = struct.pack('16sH14s', self.name.encode(),
                            socket.AF_INET, b'\x00' * 14)
        try:
            res = fcntl.ioctl(sockfd, arch.SIOCGIFNETMASK, ifreq)
        except IOError:
            return 0
        netmask = socket.ntohl(struct.unpack('16sH2xI8x', res)[2])
        return 32 - int(math.log(ctypes.c_uint32(~netmask).value + 1, 2)) 
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:18,代码来源:utils_net.py


示例2: get_orig_dst

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def get_orig_dst(sock):
    own_addr = sock.getsockname()[0]
    own_af = detect_af(own_addr)
    if own_af == socket.AF_INET:
        buf = sock.getsockopt(socket.SOL_IP, constants.SO_ORIGINAL_DST, sockaddr_size)
        sa = sockaddr_in.from_buffer_copy(buf)
        addr = socket.ntohl(sa.sin_addr)
        addr = str(addr >> 24) + '.' + str((addr >> 16) & 0xFF) + '.' + str((addr >> 8) & 0xFF) + '.' + str(addr & 0xFF)
        port = socket.ntohs(sa.sin_port)
        return addr, port
    elif own_af == socket.AF_INET6:
        buf = sock.getsockopt(constants.SOL_IPV6, constants.SO_ORIGINAL_DST, sockaddr6_size)
        sa = sockaddr_in6.from_buffer_copy(buf)
        addr = socket.inet_ntop(socket.AF_INET6, sa.sin6_addr)
        port = socket.ntohs(sa.sin_port)
        return addr, port
    else:
        raise RuntimeError("Unknown address family!") 
开发者ID:Snawoot,项目名称:rsp,代码行数:20,代码来源:transparentlistener.py


示例3: receive

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def receive(channel):
    """ Receive a message from a channel """
    size = struct.calcsize("L")
    size = channel.recv(size)
    try:
        size = socket.ntohl(struct.unpack("L", size)[0])
    except struct.error as e:
        return ''
    buf = ""
    while len(buf) < size:
        buf = channel.recv(size - len(buf))
    return pickle.loads(buf)[0] 
开发者ID:PacktPublishing,项目名称:Software-Architecture-with-Python,代码行数:18,代码来源:communication.py


示例4: _decrypt

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def _decrypt(self, text, _id, exception=None):
        text = to_binary(text)
        plain_text = self.cipher.decrypt(base64.b64decode(text))
        padding = plain_text[-1]
        content = plain_text[16:-padding]
        xml_length = socket.ntohl(struct.unpack(b"I", content[:4])[0])
        xml_content = to_text(content[4 : xml_length + 4])
        from_id = to_text(content[xml_length + 4 :])
        if from_id != _id:
            exception = exception or Exception
            raise exception()
        return xml_content 
开发者ID:wechatpy,项目名称:wechatpy,代码行数:14,代码来源:base.py


示例5: decrypt

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def decrypt(self, text, appid):
        """对解密后的明文进行补位删除
        @param text: 密文
        @return: 删除填充补位后的明文
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # 使用BASE64对密文进行解码,然后AES-CBC解密
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception as e:
            print(e)
            return ierror.WXBizMsgCrypt_DecryptAES_Error, None
        try:
            # pad = ord(plain_text[-1])
            pad = plain_text[-1]
            # 去掉补位字符串
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # 去除16位随机字符串
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_appid = content[xml_len + 4:]
        except Exception as e:
            return ierror.WXBizMsgCrypt_IllegalBuffer, None
        if from_appid != appid:
            return ierror.WXBizMsgCrypt_ValidateAppid_Error, None
        return 0, xml_content.decode() 
开发者ID:EvilPsyCHo,项目名称:TaskBot,代码行数:30,代码来源:WXBizMsgCrypt_py3.py


示例6: ntoh

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def ntoh(value):
    size = value.obj_size
    if size == 2:
        return socket.ntohs(value.v())
    elif size == 4:
        return socket.ntohl(value.v())
    from rekall import obj
    return obj.NoneObject("Not a valid integer") 
开发者ID:google,项目名称:rekall,代码行数:11,代码来源:utils.py


示例7: reverse

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def reverse(self, val):
        if self._size == 16:
            val = socket.ntohs(val)
        elif self._size == 32:
            val = socket.ntohl(val)
        return val 
开发者ID:DinoTools,项目名称:dionaea,代码行数:8,代码来源:fieldtypes.py


示例8: testNtoH

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)
            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34) 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:15,代码来源:test_socket.py


示例9: testNtoHErrors

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k) 
开发者ID:IronLanguages,项目名称:ironpython2,代码行数:15,代码来源:test_socket.py


示例10: Ip2Int

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def Ip2Int(ip):
	return socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0]) 
开发者ID:euphrat1ca,项目名称:fuzzdb-collect,代码行数:4,代码来源:GetJapIP.py


示例11: reverse

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val 
开发者ID:medbenali,项目名称:CyberScan,代码行数:8,代码来源:fields.py


示例12: _split_addrport

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def _split_addrport(self, addrtext):
        addr_s, port_s = addrtext.split(":")
        return ntohl(int(addr_s, 16)), int(port_s, 16) 
开发者ID:kdart,项目名称:pycopia,代码行数:5,代码来源:netstat.py


示例13: update

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def update(self, filt=None):
        """update([filter])
Update the RouteTable with current values. If a filter function is supplied
then it will be called with a RouteEntry and must return a true or false value.
If true, the RouterEntry will be included in the table. If false, it will not.
"""
        self._entries = []
        lines = open(FILE).readlines()
        for line in lines[1:]:
            [iface, dest, gateway, flags, refcnt, use, metric, mask,
                mtu, window, irtt] = line.split()
            rt = RouteEntry(iface,
                    Destination=ntohl(int(dest, 16)),
                    Gateway=ntohl(int(gateway, 16)),
                    Flags=RouteFlags(flags),
                    RefCnt=int(refcnt, 16),
                    Use=int(use, 16),
                    Metric=int(metric, 16),
                    Mask=ntohl(int(mask, 16)),
                    MTU=int(mtu, 16),
                    Window=int(window, 16),
                    IRTT=int(irtt, 16))
            if filt:
                if filt(rt):
                    self._entries.append(rt)
            else:
                self._entries.append(rt) 
开发者ID:kdart,项目名称:pycopia,代码行数:29,代码来源:route.py


示例14: _parse_l3_proto

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def _parse_l3_proto(self, event):
        if event.l3_protocol == 0x0008:  # IPv4
            saddr = ipaddress.IPv4Address(socket.ntohl(event.addrs.v4.saddr))
            daddr = ipaddress.IPv4Address(socket.ntohl(event.addrs.v4.daddr))
        elif event.l3_protocol == 0xDD86:  # IPv6
            saddr = ipaddress.IPv6Address(bytes(event.addrs.v6.saddr))
            daddr = ipaddress.IPv6Address(bytes(event.addrs.v6.daddr))
        else:
            print(f"Unsupported l3 protocol {event.l3_protocol}")
            exit(1)
        return (str(saddr), str(daddr)) 
开发者ID:YutaroHayakawa,项目名称:ipftrace,代码行数:14,代码来源:ipftrace.py


示例15: reverse

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def reverse(self, val):
        if self.size == 16:
            # Replaces socket.ntohs (but work on both little/big endian)
            val = struct.unpack('>H', struct.pack('<H', int(val)))[0]
        elif self.size == 32:
            # Same here but for socket.ntohl
            val = struct.unpack('>I', struct.pack('<I', int(val)))[0]
        return val 
开发者ID:secdev,项目名称:scapy,代码行数:10,代码来源:fields.py


示例16: testNtoH

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)
            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34) 
开发者ID:Microvellum,项目名称:Fluid-Designer,代码行数:15,代码来源:test_socket.py


示例17: testNtoHErrors

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k) 
开发者ID:Microvellum,项目名称:Fluid-Designer,代码行数:15,代码来源:test_socket.py


示例18: nfq_callback

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def nfq_callback(qh, unused_nfmsg, nfad, unused_data):
  packet = nfq.nfq_get_msg_packet_hdr(nfad).contents
  packet_id = socket.ntohl(packet.packet_id)
  payload_pointer = ctypes.c_void_p()
  size = nfq.nfq_get_payload(nfad, ctypes.byref(payload_pointer))
  payload = ctypes.string_at(payload_pointer, size)
  packet = Packet(packet_id, size, payload, qh)
  py_callbacks[qh](packet)
  return 0
# Maps queue handles to user-specified callbacks. 
开发者ID:google,项目名称:packet-queue,代码行数:15,代码来源:libnetfilter_queue.py


示例19: get_netmask

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def get_netmask(self):
        ifreq = struct.pack('16sH14s', self.name, AF_INET, '\x00'*14)
        try:
            res = fcntl.ioctl(sockfd, SIOCGIFNETMASK, ifreq)
        except IOError:
            return 0
        netmask = socket.ntohl(struct.unpack('16sH2xI8x', res)[2])
        return 32 - int(math.log(ctypes.c_uint32(~netmask).value + 1, 2)) 
开发者ID:omererdem,项目名称:honeything,代码行数:11,代码来源:ifconfig.py


示例20: decrypt

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def decrypt(self, text, appid):
        """对解密后的明文进行补位删除
        @param text: 密文
        @return: 删除填充补位后的明文
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # 使用BASE64对密文进行解码,然后AES-CBC解密
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception:
            return WXBizMsgCrypt_DecryptAES_Error, None
        try:
            pad = plain_text[-1]
            # 去掉补位字符串
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # 去除16位随机字符串
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack(b"I", content[:4])[0])
            xml_content = content[4:xml_len+4]
            from_appid = smart_bytes(content[xml_len+4:])
        except Exception:
            return WXBizMsgCrypt_IllegalBuffer, None
        if from_appid != smart_bytes(appid):
            return WXBizMsgCrypt_ValidateAppid_Error, None
        return 0, xml_content 
开发者ID:ScottAI,项目名称:-Odoo---,代码行数:28,代码来源:WXBizMsgCrypt.py


示例21: __toh

# 需要导入模块: import socket [as 别名]
# 或者: from socket import ntohl [as 别名]
def __toh(self):
        return socket.ntohl(self._int) 
开发者ID:ActiveState,项目名称:code,代码行数:4,代码来源:recipe-577191.py



注:本文中的socket.ntohl方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。