Python csv.DictReader方法代码示例

本文整理汇总了Python中csv.DictReader方法的典型用法代码示例。如果您正苦于以下问题:Python csv.DictReader方法的具体用法?Python csv.DictReader怎么用?Python csv.DictReader使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块csv的用法示例。

在下文中一共展示了csv.DictReader方法的29个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: ls

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def ls(args):
    bucket = resources.s3.Bucket(args.billing_reports_bucket.format(account_id=ARN.get_account_id()))
    now = datetime.utcnow()
    year = args.year or now.year
    month = str(args.month or now.month).zfill(2)
    next_year = year + ((args.month or now.month) + 1) // 12
    next_month = str(((args.month or now.month) + 1) % 12).zfill(2)
    manifest_name = "aegea/{report}/{yr}{mo}01-{next_yr}{next_mo}01/{report}-Manifest.json"
    manifest_name = manifest_name.format(report=__name__, yr=year, mo=month, next_yr=next_year, next_mo=next_month)
    try:
        manifest = json.loads(bucket.Object(manifest_name).get().get("Body").read())
        for report_key in manifest["reportKeys"]:
            report = BytesIO(bucket.Object(report_key).get().get("Body").read())
            with gzip.GzipFile(fileobj=report) as fh:
                reader = csv.DictReader(fh)
                for line in reader:
                    page_output(tabulate(filter_line_items(reader, args), args))
    except ClientError as e:
        msg = 'Unable to get report {} from {}: {}. Run "aegea billing configure" to enable reports.'
        raise AegeaException(msg.format(manifest_name, bucket, e)) 
开发者ID:kislyuk,项目名称:aegea,代码行数:22,代码来源:billing.py


示例2: test_csv_response

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def test_csv_response(self, testapp):
        # create a department and an LMPD uof incident
        department = Department.create(name="LM Police Department", short_name="LMPD", load_defaults=False)
        uof_check = dict(department_id=department.id, opaque_id="Check Opaque ID", occured_date=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), bureau="Check Bureau", division="Check Division", unit="Check Unit", platoon="Check Platoon", disposition="Check Disposition", use_of_force_reason="Check UOF Reason", officer_force_type="Check Officer Force Type", service_type="Check Service Type", arrest_made=False, arrest_charges="Check Arrest Charges", resident_injured=True, resident_hospitalized=False, resident_condition="Check Resident Condition", officer_injured=False, officer_hospitalized=False, officer_condition="Check Officer Condition", resident_identifier="Check Resident Identifier", resident_race="Check Resident Race", resident_sex="Check Resident Sex", resident_age="Check Resident Age", officer_race="Check Officer Race", officer_sex="Check Officer Sex", officer_age="Check Officer Age", officer_years_of_service="Check Officer Years Of Service", officer_identifier="Check Officer Identifier")
        UseOfForceIncidentLMPD.create(**uof_check)
        response = testapp.get("/department/{}/uof.csv".format(department.id))
        incidents = list(csv.DictReader(io.StringIO(response.text)))
        # build a variable to csv header lookup from the csv schema
        csv_schema = UseOfForceIncidentLMPD.get_csv_schema()
        schema_lookup = dict(zip([col[1] for col in csv_schema], [col[0] for col in csv_schema]))
        assert len(incidents) == 1
        for check_key in uof_check.keys():
            if check_key == 'department_id':
                continue
            assert str(uof_check[check_key]) == incidents[0][schema_lookup[check_key]] 
开发者ID:codeforamerica,项目名称:comport,代码行数:23,代码来源:test_department_model_lmpd.py


示例3: test_csv_filtered_by_dept

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def test_csv_filtered_by_dept(self, testapp):
        # create a department
        department1 = Department.create(name="IM Police Department", short_name="IMPD", load_defaults=False)
        department2 = Department.create(name="B Police Department", short_name="BPD", load_defaults=False)
        incidentclass1 = getattr(importlib.import_module("comport.data.models"), "UseOfForceIncident{}".format(department1.short_name))
        incidentclass2 = getattr(importlib.import_module("comport.data.models"), "UseOfForceIncident{}".format(department2.short_name))
        incidentclass1.create(opaque_id="123ABC", department_id=department1.id)
        incidentclass2.create(opaque_id="123XYZ", department_id=department2.id)
        response1 = testapp.get("/department/{}/uof.csv".format(department1.id))
        response2 = testapp.get("/department/{}/uof.csv".format(department2.id))
        incidents1 = list(csv.DictReader(io.StringIO(response1.text)))
        incidents2 = list(csv.DictReader(io.StringIO(response2.text)))
        assert len(incidents1) == 1 and len(incidents2) == 1
        assert incidents1[0]['id'] == '123ABC' and incidents2[0]['id'] == '123XYZ' 
开发者ID:codeforamerica,项目名称:comport,代码行数:21,代码来源:test_functional.py


示例4: clean

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def clean(self):
        if self.batch_file and self.batch_file.file:
            csvfile = csv.DictReader(self.batch_file.file, delimiter="\t")
            row = csvfile.next()
            for field in self.core_fields:
                if field not in row.keys():
                    raise ValidationError('CSV File does not have the necessary field: '+ field)
            uris = []
            for row in csvfile:
                fcode = row.get("FEATURE_CODE")
                if not fcode:
                    raise ValidationError("A Feature code is missing")
                uri = row.get("URIS").split("|")[0]
                if not uri:
                    raise ValidationError('CSV file is missing a uri')
                if uri in uris:
                    raise ValidationError('duplicate URI detected')
            uris.append(uri) 
开发者ID:LibraryOfCongress,项目名称:gazetteer,代码行数:21,代码来源:models.py


示例5: get_msr_paraphrase

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def get_msr_paraphrase() -> Dict[str, List[Dict[str, str]]]:
    url = 'https://raw.githubusercontent.com/wasiahmad/paraphrase_identification/master/dataset/msr-paraphrase-corpus/msr_paraphrase_{}.txt'  # NOQA
    root = download.get_cache_directory(os.path.join('datasets', 'msr_paraphrase'))
    def creator(path):
        dataset = {}
        fieldnames = ('quality', 'id1', 'id2', 'string1', 'string2')
        for split in ('train', 'test'):
            data_path = gdown.cached_download(url.format(split))
            with io.open(data_path, 'r', encoding='utf-8') as f:
                f.readline()  # skip header
                reader = csv.DictReader(f, delimiter='\t', fieldnames=fieldnames)
                dataset[split] = [dict(row) for row in reader]
        with io.open(path, 'wb') as f:
            pickle.dump(dataset, f)
        return dataset
    def loader(path):
        with io.open(path, 'rb') as f:
            return pickle.load(f)
    pkl_path = os.path.join(root, 'msr_paraphrase.pkl')
    return download.cache_or_load_file(pkl_path, creator, loader) 
开发者ID:tofunlp,项目名称:lineflow,代码行数:27,代码来源:msr_paraphrase.py


示例6: query_landsat_catalogue

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def query_landsat_catalogue(collection_file, cc_limit, date_start, date_end, wr2path, wr2row,
                            sensor, latest=False):
    """Query the Landsat index catalogue and retrieve urls for the best images found."""
    print("Searching for Landsat-{} images in catalog...".format(sensor))
    cc_values = []
    all_urls = []
    all_acqdates = []
    with open(collection_file) as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            year_acq = int(row['DATE_ACQUIRED'][0:4])
            month_acq = int(row['DATE_ACQUIRED'][5:7])
            day_acq = int(row['DATE_ACQUIRED'][8:10])
            acqdate = datetime.datetime(year_acq, month_acq, day_acq)
            if int(row['WRS_PATH']) == int(wr2path) and int(row['WRS_ROW']) == int(wr2row) \
                    and row['SENSOR_ID'] == sensor and float(row['CLOUD_COVER']) <= cc_limit \
                    and date_start < acqdate < date_end:
                all_urls.append(row['BASE_URL'])
                cc_values.append(float(row['CLOUD_COVER']))
                all_acqdates.append(acqdate)
    if latest and all_urls:
        return [sort_url_list(cc_values, all_acqdates, all_urls).pop()]
    return sort_url_list(cc_values, all_acqdates, all_urls) 
开发者ID:vascobnunes,项目名称:fetchLandsatSentinelFromGoogleCloud,代码行数:26,代码来源:fels.py


示例7: query_sentinel2_catalogue

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def query_sentinel2_catalogue(collection_file, cc_limit, date_start, date_end, tile, latest=False):
    """Query the Sentinel-2 index catalogue and retrieve urls for the best images found."""
    print("Searching for Sentinel-2 images in catalog...")
    cc_values = []
    all_urls = []
    all_acqdates = []
    with open(collection_file) as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            year_acq = int(row['SENSING_TIME'][0:4])
            month_acq = int(row['SENSING_TIME'][5:7])
            day_acq = int(row['SENSING_TIME'][8:10])
            acqdate = datetime.datetime(year_acq, month_acq, day_acq)
            if row['MGRS_TILE'] == tile and float(row['CLOUD_COVER']) <= cc_limit \
                    and date_start < acqdate < date_end:
                all_urls.append(row['BASE_URL'])
                cc_values.append(float(row['CLOUD_COVER']))
                all_acqdates.append(acqdate)
    if latest and all_urls:
        return [sort_url_list(cc_values, all_acqdates, all_urls).pop()]
    return sort_url_list(cc_values, all_acqdates, all_urls) 
开发者ID:vascobnunes,项目名称:fetchLandsatSentinelFromGoogleCloud,代码行数:24,代码来源:fels.py


示例8: find_best_hyper

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def find_best_hyper(dataset, model, metric="test_rmse"):
    path = "../../result/{}/{}/".format(model, dataset)
    # Get list of hyperparameters
    names, losses, stds = [], [], []
    for root, dirs, files in walk_level(path, level=0):
        for dir_name in dirs:
            loss = []
            if os.path.isfile(path + dir_name + "/results.csv"):
                with open(path + dir_name + "/results.csv") as file:
                    reader = csv.DictReader(file)
                    for row in reader:
                        loss.append(row[metric])
                names.append(dir_name)
                losses.append(float(loss[0]))
                stds.append(float(loss[1]))
    # Sort by loss
    losses, stds, names = zip(*sorted(zip(losses, stds, names)))
    # Choose lowest loss hyper
    path += names[np.argmin(losses)] + '/'
    return path 
开发者ID:blackmints,项目名称:3DGCN,代码行数:27,代码来源:scatter_plot.py


示例9: write

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def write(self, ordered_dict):
        '''
        write an entry
        :param ordered_dict: something like {'name':'exp1', 'acc':90.5, 'epoch':50}
        :return:
        '''
        if os.path.exists(self.filename) == False:
            headers = list(ordered_dict.keys())
            prev_rec = None
        else:
            with open(self.filename) as f:
                reader = csv.DictReader(f)
                headers = reader.fieldnames
                prev_rec = [row for row in reader]
            headers = self.merge_headers(headers, list(ordered_dict.keys()))
        with open(self.filename, 'w', newline='') as f:
            writer = csv.DictWriter(f, headers)
            writer.writeheader()
            if not prev_rec == None:
                writer.writerows(prev_rec)
            writer.writerow(ordered_dict) 
开发者ID:ChrisWu1997,项目名称:2D-Motion-Retargeting,代码行数:24,代码来源:utils.py


示例10: csv_to_deck

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def csv_to_deck(csv_path):
    """Creates a Tinycards deck from a CSV file.
    The CSV file is expected to have two columns named 'front' and 'back'.
    """
    # Create new deck.
    tinycards = Tinycards(user_identifier, user_password)
    deck = Deck('French Words')
    deck = tinycards.create_deck(deck)
    # Extract data from CSV file.
    word_pairs = []
    with open(csv_path, 'r') as csv_file:
        csv_reader = csv.DictReader(csv_file)
        for row in csv_reader:
            current_word_pair = (row['front'], row['back'])
            word_pairs.append(current_word_pair)
    # Populate deck with cards from CSV data.
    for pair in word_pairs:
        deck.add_card(pair)
    # Save changes to Tinycards.
    tinycards.update_deck(deck) 
开发者ID:floscha,项目名称:tinycards-python-api,代码行数:26,代码来源:csv_to_deck.py


示例11: add_cards_from_csv

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def add_cards_from_csv(self, csv_file,
                           front_column='front',
                           back_column='back'):
        """Add word pairs from a CSV file as cards to the deck.
        Args:
            csv_file: The file buffer that contains the CSV data.
            front_column (str): Optional name for the 'front' column.
            back_column (str): Optional name for the 'back' column.
        Example:
            >>> with open(csv_path, 'r') as csv_file:
            >>>     deck.add_cards_from_csv(csv_file)
        """
        csv_reader = csv.DictReader(csv_file)
        for row in csv_reader:
            current_word_pair = (row[front_column], row[back_column])
            self.add_card(current_word_pair) 
开发者ID:floscha,项目名称:tinycards-python-api,代码行数:21,代码来源:deck.py


示例12: import_business_data_relations

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def import_business_data_relations(self, data_source):
        """
        Imports business data relations
        """
        if isinstance(data_source, str):
            data_source = [data_source]
        for path in data_source:
            if os.path.isabs(path):
                if os.path.isfile(os.path.join(path)):
                    relations = csv.DictReader(open(path, "r"))
                    RelationImporter().import_relations(relations)
                else:
                    utils.print_message("No file found at indicated location: {0}".format(path))
                    sys.exit()
            else:
                utils.print_message(
                    "ERROR: The specified file path appears to be relative. \
                    Please rerun command with an absolute file path."
                )
                sys.exit() 
开发者ID:archesproject,项目名称:arches,代码行数:23,代码来源:packages.py


示例13: insert_question_to_sqlt

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def insert_question_to_sqlt():
    question_set = []
    last_question = ""
    with open(CORPUS_DIR+"/"+WIKI_QA_TSV) as file:
        wiki_file = csv.DictReader(file, dialect='excel-tab')
        if wiki_file is not None:
            for row in wiki_file:
                if row['Question'] != last_question:
                    question = (row['Question'], )
                    question_set.append(question)
                    last_question = row['Question']
    if question_set is not None:
        sqlt_man = SqLiteManager()
        # sqlt_man.remove_old_results()
        sqlt_man.remove_all_data()
        logger.info("Removed Old test results")
        sqlt_man.insert_many_question(question_set)
        logger.info("Inserted {0} questions".format(sqlt_man.get_question_count())) 
开发者ID:5hirish,项目名称:adam_qas,代码行数:21,代码来源:create_test_data.py


示例14: generate_modified_file

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def generate_modified_file(src, dst, sample, corrupt):
    """원본 파일을 샘플링하고 결측치 넣은 새 파일 생성"""
    # 랜덤 시드 고정. 매번 동일한 결과가 보장되도록.
    random.seed(0)
    with open(src, 'r') as fr:
        with open(dst, 'w') as fw:
            csvr = csv.DictReader(fr)
            csvw = csv.DictWriter(fw, csvr.fieldnames)
            csvw.writeheader()
            rows = csvr
            # 샘플링
            if sample:
                rows = (row for row in rows if random.random() <= SAMPLE_RATE)
            # 결측치 추가
            if corrupt:
                rows = (corrupt_row(row) for row in rows)
            csvw.writerows(rows) 
开发者ID:akngs,项目名称:petitions,代码行数:24,代码来源:petition.py


示例15: handle

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def handle(self, *args, **options):
        # Get the only instance of Magazine Department Index Page
        magazine_department_index_page = MagazineDepartmentIndexPage.objects.get()
        with open(options["file"]) as import_file:
            departments = csv.DictReader(import_file)
            for department in departments:
                import_department = MagazineDepartment(
                    title=department["title"],
                )
                # Add department to site page hiererchy
                magazine_department_index_page.add_child(instance=import_department)
                magazine_department_index_page.save()
        self.stdout.write("All done!") 
开发者ID:WesternFriend,项目名称:WF-website,代码行数:19,代码来源:import_departments.py


示例16: credential_report

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def credential_report(self):
        if "credential_report" not in self.cache:
            iam = clients.iam
            iam.generate_credential_report()
            while True:
                try:
                    self.cache["credential_report"] = iam.get_credential_report()
                    break
                except ClientError as e:
                    expect_error_codes(e, "ReportInProgress")
        return csv.DictReader(self.cache["credential_report"]["Content"].decode("utf-8").splitlines()) 
开发者ID:kislyuk,项目名称:aegea,代码行数:13,代码来源:audit.py


示例17: load_csv

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def load_csv(fp_in, delimiter=',', quotechar='"', remove_empty=False,
        custom_headers=None, **kwargs):
    r = csv.DictReader(fp_in, delimiter=delimiter, quotechar=quotechar,
            fieldnames=custom_headers)
    rows = [row_dct for row_dct in r]
    if remove_empty:
        rows = [dict([(k, item) for k, item in row.items() if item]) for row in rows]
    return rows 
开发者ID:oplatek,项目名称:csv2json,代码行数:10,代码来源:__init__.py


示例18: __init__

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def __init__(self, f, encoding="utf-8", fieldnames=None, **kwargs):
        csv.DictReader.__init__(self, f, fieldnames=fieldnames, **kwargs)
        self.reader = UnicodeCsvReader(f, encoding=encoding, **kwargs) 
开发者ID:fpsw,项目名称:Servo,代码行数:5,代码来源:ucsv.py


示例19: load_examples

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def load_examples(tmp_dir, prop_train=0.09, prop_val=0.01):
  """Loads exampls from the tsv file.
  Args:
    tmp_dir: temp directory.
    prop_train: proportion of the train data
    prop_val: proportion of the validation data
  Returns:
    All examples in the dataset pluse train, test, and development splits.
  """
  infile = generator_utils.maybe_download(tmp_dir, _TAR, _URL)
  tf.logging.info('Loading examples')
  all_examples = []
  for i, d in enumerate(csv.DictReader(gzip.open(infile), delimiter='\t')):
    if i % 100000 == 0:
      tf.logging.info('%d examples have been loaded....' % i)
    ex = {x: int(y) if y.isdigit() else y for x, y in d.items()}
    all_examples.append(ex)
  random.seed(1)
  random.shuffle(all_examples)
  n_train = int(len(all_examples) * prop_train)
  n_val = n_train + int(len(all_examples) * prop_val)
  train = all_examples[:n_train]
  val = all_examples[n_train:n_val]
  test = []
  for e in all_examples[n_val:]:
    if e['n_intervening'] == e['n_diff_intervening']:
      test.append(e)
  return all_examples, train, val, test 
开发者ID:akzaidi,项目名称:fine-lm,代码行数:37,代码来源:subject_verb_agreement.py


示例20: __read_countries_file

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def __read_countries_file(self):
        """Read countries from a CSV file"""
        import csv
        import pkg_resources
        filename = pkg_resources.resource_filename('sortinghat', 'data/countries.csv')
        with open(filename, 'r') as f:
            reader = csv.DictReader(f, fieldnames=['name', 'code', 'alpha3'])
            countries = [Country(**c) for c in reader]
        return countries 
开发者ID:chaoss,项目名称:grimoirelab-sortinghat,代码行数:14,代码来源:init.py


示例21: parseResults

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def parseResults(self, text):
        import csv
        l = []
        files = self.getCSVFiles(text)
        for filename in files:
            with open(self._prepend_working_dir(filename), 'r') as csvfile:
                csvReader = csv.DictReader(csvfile, fieldnames=self.colnames, **self.kwargs)
                l = l + [r for r in (self.filter_fn(x) for x in csvReader) if r]
        return l 
开发者ID:graalvm,项目名称:mx,代码行数:11,代码来源:mx_benchmark.py


示例22: load_vo_csvfile

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def load_vo_csvfile(filename, unit_map):
    r"""Reads a CSV file and returns a two-level dict mapping hostnames + fields to values.
    Skips initial lines beginning with `#'.
    Map to a value using a construction like, for example:
       d['Wolf 1061'][1]['pl_orbeccen']
    This is the orbital eccentricity of a Wolf 1061 planet, the second one in the catalog.
    If the value was not given in the catalog, the value in the dictionary structure
    is set up as None.
    """
    # basic data structure: a dictionary containing lists (lists of further
    # dictionaries, to be precise).
    d = defaultdict(list)
    with open(filename, 'r') as csvfile:
        # skip lines starting with #
        pos = csvfile.tell()
        while csvfile.readline().startswith('#'):
            pos = csvfile.tell()
        csvfile.seek(pos)
        # read the file sequentially by row
        reader = csv.DictReader(csvfile)
        for row in reader:
            # remap the each "value" v of row, which is a dict, through the appropriate
            # function in unit_map above -- but map empty strings to None
            row_remap = {k:(unit_map[k](v) if v else None) for (k,v) in row.items()}
            # Append an entry to the list held within one slot of "d",
            # using the "hostname" as key.
            d[row['pl_hostname']].append(row_remap)
    return d 
开发者ID:dsavransky,项目名称:EXOSIMS,代码行数:32,代码来源:Utilities.py


示例23: _ParseScanReportCSV

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def _ParseScanReportCSV(body, content_type):
        if content_type != 'Content-Type: text/csv; name=report.csv':
            raise ValueError("Invalid content type")
        csv_ = base64.urlsafe_b64decode(body).decode('utf8')
        report_data = csv.DictReader(io.StringIO(csv_))
        return report_data
    #
    # The following functions implement the Role Management API:
    # ========================================================= 
开发者ID:rapid7,项目名称:nexpose-client-python,代码行数:12,代码来源:nexpose.py


示例24: test_csv_export

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def test_csv_export(self):
        import csv
        #settings.DEBUG = True
        resp = self.c.get('/1.0/place/search.json?q=Wabash%20Municipal&format=csv')
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(resp['Content-Type'], 'text/csv')
        csvfile = csv.DictReader(resp, fieldnames=Place.CSV_FIELDNAMES, delimiter="\t")
        row = csvfile.next() #header
        places = []
        for row in csvfile:
            places.append(row)
        self.assertEqual(True, "Wabash Municipal" in places[0]["NAME"])
        #settings.DEBUG = False 
开发者ID:LibraryOfCongress,项目名称:gazetteer,代码行数:16,代码来源:tests.py


示例25: import_privileges_to_db

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def import_privileges_to_db(db_conn, csv_path):
        plugin_name = csv_path.split('/')[-2]
        # InsertDB.insert_new_plugin(db_conn, plugin_name=plugin_name)
        with open(csv_path, mode='r') as csv_file:
            csvr = DictReader(csv_file)
            for i, row in enumerate(csvr):
                try:
                    InsertDB.insert_new_command(db_conn, plugin_name=plugin_name, command_name=row['command'],
                                                permission_level=int(row['level']), ignore_file_save=True)
                except Error:
                    dprint("Encountered an error while importing plugin privileges data into the database.")
                    log(WARNING, "Encountered an error while importing plugin privileges data into the database.",
                        origin=L_DATABASE)
                    continue 
开发者ID:DuckBoss,项目名称:JJMumbleBot,代码行数:16,代码来源:database_utils.py


示例26: import_aliases_to_db

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def import_aliases_to_db(db_conn, csv_path):
        file_name = csv_path.split('/')[-2]
        with open(csv_path, mode='r') as csv_file:
            csvr = DictReader(csv_file)
            for i, row in enumerate(csvr):
                try:
                    InsertDB.insert_new_alias(db_conn, alias_name=row['alias'].strip(), commands=row['command'].strip(), ignore_file_save=True)
                except Error:
                    dprint(
                        f"Encountered an error while importing a plugin alias from {file_name} plugin into the database.")
                    log(WARNING,
                        "Encountered an error while importing a plugin alias from {file_name} plugin into the database.",
                        origin=L_DATABASE)
                    continue 
开发者ID:DuckBoss,项目名称:JJMumbleBot,代码行数:16,代码来源:database_utils.py


示例27: read_result_data

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def read_result_data(self):
        while True:
            line = self.file.readline()
            if 'START_CSV_SECTION' in line:
                break
            self.meta.append(line)
        reader = csv.DictReader(self.file)
        data = [_ for _ in reader]
        return data 
开发者ID:AuCson,项目名称:SEDST,代码行数:11,代码来源:metric.py


示例28: _get_species_data

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def _get_species_data():
    csv_lines = resource_string(__name__, 'species-data.csv').split(b'\n')
    if sys.version_info[0] > 2:
        csv_lines = [l.decode() for l in csv_lines]
    reader = csv.DictReader(csv_lines, quoting=csv.QUOTE_NONNUMERIC)
    species_data = {row['symbol']: row for row in reader}
    return species_data 
开发者ID:jhrmnn,项目名称:pyberny,代码行数:9,代码来源:species_data.py


示例29: import_players

# 需要导入模块: import csv [as 别名]
# 或者: from csv import DictReader [as 别名]
def import_players(self):
        players = []
        with open(self.filename, 'r') as csvfile:
            csv_data = csv.DictReader(csvfile, skipinitialspace=True)
            for row in csv_data:
                game_info = None
                try:
                    away_team, home_team = row.get('Game', '').split('@')
                    game_info = GameInfo(home_team, away_team, None, False)
                except ValueError:
                    pass
                try:
                    player = Player(
                        row['Id'],
                        row['First Name'],
                        row['Last Name'],
                        row['Position'].split('/'),
                        row['Team'],
                        float(row['Salary']),
                        float(row['FPPG']),
                        is_injured=True if row['Injury Status'].strip() else False,
                        game_info=game_info,
                        **self.get_player_extra(row)
                    )
                except KeyError:
                    raise LineupOptimizerIncorrectCSV
                players.append(player)
        return players 
开发者ID:DimaKudosh,项目名称:pydfs-lineup-optimizer,代码行数:30,代码来源:importer.py



注:本文中的csv.DictReader方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。