Python fcntl.LOCK_NB属性代码示例

本文整理汇总了Python中fcntl.LOCK_NB属性的典型用法代码示例。如果您正苦于以下问题:Python fcntl.LOCK_NB属性的具体用法?Python fcntl.LOCK_NB怎么用?Python fcntl.LOCK_NB使用的例子?那么恭喜您, 这里精选的属性代码示例或许可以为您提供帮助。您也可以进一步了解该属性所在模块fcntl的用法示例。

在下文中一共展示了fcntl.LOCK_NB属性的27个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: load

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def load(self):
        # Try to obtain exclusive access to the journal cache
        filename = join(config.app_dir, 'replay.jsonl')
        try:
            try:
                # Try to open existing file
                self.replayfile = open(filename, 'r+', buffering=1)
            except:
                if exists(filename):
                    raise	# Couldn't open existing file
                else:
                    self.replayfile = open(filename, 'w+', buffering=1)	# Create file
            if sys.platform != 'win32':	# open for writing is automatically exclusive on Windows
                lockf(self.replayfile, LOCK_EX|LOCK_NB)
        except:
            if __debug__: print_exc()
            if self.replayfile:
                self.replayfile.close()
            self.replayfile = None
            return False
        self.replaylog = [line.strip() for line in self.replayfile]
        return True 
开发者ID:EDCD,项目名称:EDMarketConnector,代码行数:24,代码来源:eddn.py


示例2: containers

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def containers(self, *args):
        log.info(" ".join(args))
        data = Run(data=True)(deimos.docker.docker("ps", "--no-trunc", "-q"))
        mesos_ids = []
        for line in data.splitlines():
            cid = line.strip()
            state = deimos.state.State(self.state_root, docker_id=cid)
            if not state.exists():
                continue
            try:
                state.lock("wait", LOCK_SH | LOCK_NB)
            except deimos.flock.Err:     # LOCK_EX held, so launch() is running
                mesos_ids += [state.mesos_container_id()]
        containers = Containers()
        for mesos_id in mesos_ids:
            container = containers.containers.add()
            container.value = mesos_id
        recordio.writeProto(containers)
        return 0 
开发者ID:mesosphere-backup,项目名称:deimos,代码行数:21,代码来源:docker.py


示例3: __init__

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def __init__(self, path, flags, seconds=default_timeout):
        """Construct a lockable file handle. Handles are recycled.
        If seconds is 0, LOCK_NB will be set. If LOCK_NB is set, seconds will
        be set to 0. If seconds is None, there will be no timeout; but flags
        will not be adjusted in any way.
        """
        full = os.path.abspath(path)
        flags, seconds = nb_seconds(flags, seconds)
        if full not in locks:
            _Struct.__init__(self, path=full,
                                   handle=None,
                                   fd=None,
                                   flags=flags,
                                   seconds=seconds)
            locks[self.path] = self 
开发者ID:mesosphere-backup,项目名称:deimos,代码行数:18,代码来源:flock.py


示例4: lock

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def lock(self):
        if self.handle is None or self.handle.closed:
            self.handle = open(self.path, "w+")
            self.fd = self.handle.fileno()
        if (self.flags & fcntl.LOCK_NB) != 0 or self.seconds is None:
            try:
                fcntl.flock(self.handle, self.flags)
            except IOError as e:
                if e.errno not in [errno.EACCES, errno.EAGAIN]:
                    raise e
                raise Locked(self.path)
        else:
            with timeout(self.seconds):
                try:
                    fcntl.flock(self.handle, self.flags)
                except IOError as e:
                    errnos = [errno.EINTR, errno.EACCES, errno.EAGAIN]
                    if e.errno not in errnos:
                        raise e
                    raise Timeout(self.path) 
开发者ID:mesosphere-backup,项目名称:deimos,代码行数:22,代码来源:flock.py


示例5: udpate_status

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def udpate_status(status_file_path, module_data, module):
	lock_file_path = "{}.lock".format(status_file_path)
	while True:
		status_file_lock = open(lock_file_path, 'r')
		try:
			fcntl.flock(status_file_lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
			with open(status_file_path, "r+") as status_file:
				data = json.load(status_file)
				status_file.seek(0)
				data[module].update(module_data)
				json.dump(data, status_file)
			fcntl.flock(status_file_lock, fcntl.LOCK_UN)
			return
		except:
			time.sleep(0.1)
		status_file_lock.close() 
开发者ID:BishopFox,项目名称:IDontSpeakSSL,代码行数:18,代码来源:utils.py


示例6: _create_pid_file

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def _create_pid_file():
    global fp
    pidfile = CONF.etc.pidfile
    if pidfile is None:
        raise UpdaterErr("No pidfile option found in config file.")
    try:
        fp = open(pidfile, 'w')
        # LOCK_EX    /* exclusive lock */
        # LOCK_NB   * don't block when locking */
        fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
        fp.truncate()
        pid = os.getpid()
        fp.write(str(pid))
        fp.flush()
    except Exception as e:
        raise UpdaterErr("Failed to lock pidfile, perhaps named_updater is already running.") 
开发者ID:qunarcorp,项目名称:open_dnsdb,代码行数:18,代码来源:updater.py


示例7: __enter__

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def __enter__(self):
        if self.path is None:
            return self.pidfile
        self.pidfile = open(self.path, "a+")
        try:
            fcntl.flock(self.pidfile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            self.pidfile = None
            raise SystemExit("Already running according to " + self.path)
        self.pidfile.seek(0)
        self.pidfile.truncate()
        self.pidfile.write(str(os.getpid()))
        self.pidfile.flush()
        self.pidfile.seek(0)
        return self.pidfile 
开发者ID:perfsonar,项目名称:pscheduler,代码行数:18,代码来源:pidfile.py


示例8: write_pidfile

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def write_pidfile(pid, pidfile):
    """
    This method writes the PID to the pidfile and locks it while the process is running.
    :param pid: PID of SmartHomeNG
    :param pidfile: Name of the pidfile to write to
    :type pid: int
    :type pidfile: str
    """
    fd = open(pidfile, 'w+')
    fd.write("%s" % pid)
    fd.close()
    # lock pidfile:
    try:
        fd = os.open(pidfile, os.O_RDONLY)
        fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
        # don't close fd or lock is gone
    except OSError as e:
        print("Could not lock pid file: %d (%s)" % (e.errno, e.strerror) , file=sys.stderr) 
开发者ID:smarthomeNG,项目名称:smarthome,代码行数:23,代码来源:daemon.py


示例9: acquire

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def acquire(self, blocking=None):
        # Give an opportunity to set blocking with the class for context use
        if blocking is None:
            blocking = self.blocking
        if blocking:
            lock_mode = fcntl.LOCK_EX
        else:
            lock_mode = fcntl.LOCK_EX | fcntl.LOCK_NB
        if self.lock_file.closed:
            self._get_lock_file_handle()
        if not self.locked:
            try:
                self.lock = fcntl.flock(self.lock_file, lock_mode)
                self.locked = True
            except IOError:
                raise IOError("File '{}' is already locked.".format(self.lock_file_name))
        else:
            raise IOError("File '{}' is already locked.".format(self.lock_file_name)) 
开发者ID:Tufin,项目名称:pytos,代码行数:21,代码来源:utils.py


示例10: AcquireFileLock

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def AcquireFileLock(target_file, flags):
  """ Lock the target file. Note that if |target_file| is closed, the lock is
    automatically released.
  Args:
    target_file: file handle of the file to acquire lock.
    flags: can be any of the type LOCK_EX, LOCK_SH, LOCK_NB, or a bitwise
      OR combination of flags.
  """
  assert flags in (
      LOCK_EX, LOCK_SH, LOCK_NB, LOCK_EX | LOCK_NB, LOCK_SH | LOCK_NB)
  if os.name == 'nt':
    _LockImplWin(target_file, flags)
  elif os.name == 'posix':
    _LockImplPosix(target_file, flags)
  else:
    raise NotImplementedError('%s is not supported' % os.name) 
开发者ID:FSecureLABS,项目名称:Jandroid,代码行数:18,代码来源:lock.py


示例11: bounce_lock

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def bounce_lock(name):
    """Acquire a bounce lockfile for the name given. The name should generally
    be the service namespace being bounced.
    This is a contextmanager. Please use it via 'with bounce_lock(name):'.
    :param name: The lock name to acquire"""
    lockfile = "/var/lock/%s.lock" % name
    with open(lockfile, "w") as fd:
        remove = False
        try:
            fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
            remove = True
            yield
        except IOError:
            raise LockHeldException("Service %s is already being bounced!" % name)
        finally:
            if remove:
                os.remove(lockfile) 
开发者ID:Yelp,项目名称:paasta,代码行数:21,代码来源:bounce_lib.py


示例12: test_bounce_lock

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def test_bounce_lock(self):
        import fcntl
        lock_name = "the_internet"
        lock_file = "/var/lock/%s.lock" % lock_name
        fake_fd = mock.mock_open()
        with mock.patch("builtins.open", fake_fd, autospec=None) as open_patch:
            with mock.patch("fcntl.lockf", autospec=None) as lockf_patch:
                with mock.patch("os.remove", autospec=None) as remove_patch:
                    with bounce_lib.bounce_lock(lock_name):
                        pass
        open_patch.assert_called_once_with(lock_file, "w")
        lockf_patch.assert_called_once_with(
            fake_fd.return_value, fcntl.LOCK_EX | fcntl.LOCK_NB
        )
        fake_fd.return_value.__exit__.assert_called_once_with(None, None, None)
        remove_patch.assert_called_once_with(lock_file) 
开发者ID:Yelp,项目名称:paasta,代码行数:19,代码来源:test_bounce_lib.py


示例13: lock

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def lock(self):
        """
        Locks the package to avoid concurrent operations on its shared
        resources.
        Currently, the only resource shared among scripts executed from
        different directories is the repository.
        """
        if not self.locking_enabled:
            LOG.debug("This package has no shared resources to lock")
            return
        LOG.debug("Checking for lock on file {}.".format(self.lock_file_path))
        self.lock_file = open(self.lock_file_path, "w")
        try:
            fcntl.lockf(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError as exc:
            RESOURCE_UNAVAILABLE_ERROR = 11
            if exc.errno == RESOURCE_UNAVAILABLE_ERROR:
                LOG.info("Waiting for other process to finish operations "
                         "on {}.".format(self.name))
            else:
                raise
        fcntl.lockf(self.lock_file, fcntl.LOCK_EX) 
开发者ID:open-power-host-os,项目名称:builds,代码行数:25,代码来源:package.py


示例14: write_pid

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def write_pid(path):
    """Writes our PID to *path*."""
    try:
        pid = os.getpid()
        with io.open(path, mode='w', encoding='utf-8') as pidfile:
            # Get a non-blocking exclusive lock
            fcntl.flock(pidfile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
            pidfile.seek(0)
            pidfile.truncate(0)
            pidfile.write(unicode(pid))
    except:
        logging.error(_("Could not write PID file: %s") % path)
        raise # This raises the original exception
    finally:
        try:
            pidfile.close()
        except:
            pass 
开发者ID:jimmy201602,项目名称:django-gateone,代码行数:20,代码来源:utils.py


示例15: singleton

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def singleton(lockfile, text="semaphore"):
    """
    Open a lockfile exclusively.
    """
    if not lockfile.parent.exists():
        mkdir(lockfile.parent)
    try:
        if has_fcntl:
            f = open(lockfile, "w")
            fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
        else:
            if lockfile.exists():
                lockfile.unlink()
            fd = os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
            f = open(fd, "w")
        f.write(text)
    except IOError:
        return None
    return f 
开发者ID:Chia-Network,项目名称:chia-blockchain,代码行数:23,代码来源:server.py


示例16: __init__

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def __init__(self, client_name):
        self.lockfile = os.path.join(get_cache_dir("client_locks"), client_name)
        logger.debug("SingleInstance lockfile: " + self.lockfile)
        if sys.platform == 'win32':
            try:
                # file already exists, we try to remove (in case previous execution was interrupted)
                if os.path.exists(self.lockfile):
                    os.unlink(self.lockfile)
                    self.fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
            except OSError as e:
                if e.errno == 13:
                    logger.error("Another instance is already running, quitting.")
                    sys.exit(-1)
                else:
                    raise e
        else:  # non Windows
            self.fp = open(self.lockfile, 'w')
            try:
                fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
            except IOError:
                logger.error("Another instance is already running, quitting.")
                sys.exit(-1) 
开发者ID:ActivityWatch,项目名称:aw-client,代码行数:24,代码来源:singleinstance.py


示例17: main

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def main(self, test_args: dict = None):
        start_time = datetime.now()
        args = self.parser.parse_args(test_args)
        root_folder = Path(args.root_folder).absolute()
        db_path = Path(args.db_path) if args.db_path else root_folder
        if not root_folder.exists():
            root_folder.mkdir(parents=True, mode=0o700)
        setup_logging(args.log_level, args.logfile, root_folder)
        log.warning(f"gphotos-sync {__version__} {start_time}")
        args = self.fs_checks(root_folder, args)
        lock_file = db_path / "gphotos.lock"
        fp = lock_file.open("w")
        with fp:
            try:
                if os.name != "nt":
                    fcntl.lockf(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
            except IOError:
                log.warning("EXITING: database is locked")
                sys.exit(0)
            log.info(self.version_string)
            # configure and launch
            # noinspection PyBroadException
            try:
                self.setup(args, db_path)
                self.start(args)
            except KeyboardInterrupt:
                log.error("User cancelled download")
                log.debug("Traceback", exc_info=True)
            except BaseException:
                log.error("\nProcess failed.", exc_info=True)
            finally:
                log.warning("Done.")
        elapsed_time = datetime.now() - start_time
        log.info("Elapsed time = %s", elapsed_time) 
开发者ID:gilesknap,项目名称:gphotos-sync,代码行数:43,代码来源:Main.py


示例18: write_pid_file

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def write_pid_file(pid_file, pid):
    import fcntl
    import stat
    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0 
开发者ID:ntfreedom,项目名称:neverendshadowsocks,代码行数:32,代码来源:daemon.py


示例19: lock_file

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def lock_file(self, name, blocking=True):
        if name in self.locks:
            self.log(DEBUG, "lock_file: %s already locked", name)
            return
        try:
            f = open(name, 'w')
        except IOError as e:
            self.log(ERROR, "open: %s", e)
            return e.errno
        self.locks[name] = f
        flag = fcntl.LOCK_EX
        if not blocking:
            flag |= fcntl.LOCK_NB
        while True:
            try:
                status = fcntl.lockf(f, flag)
                break
            except IOError as e:
                if e.errno in (errno.EAGAIN, errno.EACCES) and not blocking:
                    f.close()
                    del self.locks[name]
                    return e.errno
                if e.errno != errno.EINTR:
                    status = e.errno
                    self.log(ERROR, "lockf: %s", e)
                    self.fail(106)
        return status 
开发者ID:rucio,项目名称:rucio,代码行数:30,代码来源:pcache.py


示例20: __enter__

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def __enter__(self):
        self._worker_lock_dir.mkdir(exist_ok=True)
        worker_id = 0
        while True:
            self._lock_file = (self._worker_lock_dir / str(worker_id)).open('w')
            try:
                fcntl.lockf(self._lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
                return worker_id
            except OSError as e:
                if e.errno not in (errno.EAGAIN, errno.EACCES):
                    raise
            worker_id += 1 
开发者ID:PrivacyScore,项目名称:PrivacyScore,代码行数:14,代码来源:utils.py


示例21: _lock_file

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def _lock_file(f, dotlock=True):
    """Lock file f using lockf and dot locking."""
    dotlock_done = False
    try:
        if fcntl:
            try:
                fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
            except IOError, e:
                if e.errno in (errno.EAGAIN, errno.EACCES, errno.EROFS):
                    raise ExternalClashError('lockf: lock unavailable: %s' %
                                             f.name)
                else:
                    raise
        if dotlock:
            try:
                pre_lock = _create_temporary(f.name + '.lock')
                pre_lock.close()
            except IOError, e:
                if e.errno in (errno.EACCES, errno.EROFS):
                    return  # Without write access, just skip dotlocking.
                else:
                    raise
            try:
                if hasattr(os, 'link'):
                    os.link(pre_lock.name, f.name + '.lock')
                    dotlock_done = True
                    os.unlink(pre_lock.name)
                else:
                    os.rename(pre_lock.name, f.name + '.lock')
                    dotlock_done = True
            except OSError, e:
                if e.errno == errno.EEXIST or \
                  (os.name == 'os2' and e.errno == errno.EACCES):
                    os.remove(pre_lock.name)
                    raise ExternalClashError('dot lock unavailable: %s' %
                                             f.name)
                else:
                    raise 
开发者ID:glmcdona,项目名称:meddle,代码行数:40,代码来源:mailbox.py


示例22: __init__

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def __init__(self, args, single_app=False):
        # Ensure only one app instance is running.
        # From https://stackoverflow.com/questions/220525/
        #              ensure-a-single-instance-of-an-application-in-linux#221159
        if single_app:
            pid_file = os.path.join(SETTINGS_DIR, "restatic.pid")
            lockfile = open(pid_file, "w+")
            try:
                fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
                self.lockfile = lockfile
            except OSError:
                print("An instance of Restatic is already running.")
                sys.exit(1)
        super().__init__(args)
        self.setQuitOnLastWindowClosed(False)
        self.scheduler = RestaticScheduler(self)
        # Prepare tray and main window
        self.tray = TrayMenu(self)
        self.main_window = MainWindow(self)
        self.main_window.show()
        self.backup_started_event.connect(self.backup_started_event_response)
        self.backup_finished_event.connect(self.backup_finished_event_response)
        self.backup_cancelled_event.connect(self.backup_cancelled_event_response) 
开发者ID:Mebus,项目名称:restatic,代码行数:29,代码来源:application.py


示例23: _acquire

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def _acquire(self):
        open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
        fd = os.open(self._lock_file, open_mode)
        try:
            fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except (IOError, OSError):
            os.close(fd)
        else:
            self._lock_file_fd = fd
        return None 
开发者ID:a4k-openproject,项目名称:a4kScrapers,代码行数:13,代码来源:filelock.py


示例24: format_lock_flags

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def format_lock_flags(flags):
    tokens = [("EX", fcntl.LOCK_EX), ("SH", fcntl.LOCK_SH),
               ("UN", fcntl.LOCK_UN), ("NB", fcntl.LOCK_NB)]
    return "|".join(s for s, flag in tokens if (flags & flag) != 0) 
开发者ID:mesosphere-backup,项目名称:deimos,代码行数:6,代码来源:flock.py


示例25: nb_seconds

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def nb_seconds(flags, seconds):
    if seconds == 0:
        flags |= fcntl.LOCK_NB
    if (flags & fcntl.LOCK_NB) != 0:
        seconds = 0
    return flags, seconds 
开发者ID:mesosphere-backup,项目名称:deimos,代码行数:8,代码来源:flock.py


示例26: NX_init

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def NX_init(port, baudrate):
    global ser, NX_lf
    ser.port = port
    ser.baudrate = baudrate
    ser.timeout = 0.2
    ser.open()
    import fcntl, serial
    try:
        fcntl.flock(ser.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
    except IOError:
        print ('Serial port is busy')
        return False
    # Buffer des Displays leeren
    # - Ungültigen Befehl senden
    # - Aufwachbefehl senden
    ser.write(b'nop' + NX_lf)
    ser.write(b'sleep=0' + NX_lf)
    # - Warten
    ser.flush()
    time.sleep(0.5)
    # - Empfangene Zeichen löschen
    ser.flushInput()
    # Immer eine Rückmeldung erhalten
    ser.write(b'ref 0' + NX_lf)
    ser.flush()
    return NX_waitok() 
开发者ID:WLANThermo,项目名称:WLANThermo_v2,代码行数:30,代码来源:nextion_hwclock.py


示例27: _write

# 需要导入模块: import fcntl [as 别名]
# 或者: from fcntl import LOCK_NB [as 别名]
def _write(filename, contents):
        ''' Actually write the file contents to disk. This helps with mocking. '''
        tmp_filename = filename + '.yedit'
        with open(tmp_filename, 'w') as yfd:
            fcntl.flock(yfd, fcntl.LOCK_EX | fcntl.LOCK_NB)
            yfd.write(contents)
            fcntl.flock(yfd, fcntl.LOCK_UN)
        os.rename(tmp_filename, filename) 
开发者ID:RedHatOfficial,项目名称:ansible-redhat_openshift_utils,代码行数:13,代码来源:oc_obj.py



注:本文中的fcntl.LOCK_NB属性示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。