from collections import deque
class Process:
def __init__(self, process_name: str, arrival_time: int, burst_time: int) -> None:
self.process_name = process_name # process name
self.arrival_time = arrival_time # arrival time of the process
# completion time of finished process or last interrupted time
self.stop_time = arrival_time
self.burst_time = burst_time # remaining burst time
self.waiting_time = 0 # total time of the process wait in ready queue
self.turnaround_time = 0 # time from arrival time to completion time
class MLFQ:
"""
MLFQ(Multi Level Feedback Queue)
https://en.wikipedia.org/wiki/Multilevel_feedback_queue
MLFQ has a lot of queues that have different priority
In this MLFQ,
The first Queue(0) to last second Queue(N-2) of MLFQ have Round Robin Algorithm
The last Queue(N-1) has First Come, First Served Algorithm
"""
def __init__(
self,
number_of_queues: int,
time_slices: list[int],
queue: deque[Process],
current_time: int,
) -> None:
# total number of mlfq's queues
self.number_of_queues = number_of_queues
# time slice of queues that round robin algorithm applied
self.time_slices = time_slices
# unfinished process is in this ready_queue
self.ready_queue = queue
# current time
self.current_time = current_time
# finished process is in this sequence queue
self.finish_queue: deque[Process] = deque()
def calculate_sequence_of_finish_queue(self) -> list[str]:
"""
This method returns the sequence of finished processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.multi_level_feedback_queue()
>>> mlfq.calculate_sequence_of_finish_queue()
['P2', 'P4', 'P1', 'P3']
"""
sequence = []
for i in range(len(self.finish_queue)):
sequence.append(self.finish_queue[i].process_name)
return sequence
def calculate_waiting_time(self, queue: list[Process]) -> list[int]:
"""
This method calculates waiting time of processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.multi_level_feedback_queue()
>>> mlfq.calculate_waiting_time([P1, P2, P3, P4])
[83, 17, 94, 101]
"""
waiting_times = []
for i in range(len(queue)):
waiting_times.append(queue[i].waiting_time)
return waiting_times
def calculate_turnaround_time(self, queue: list[Process]) -> list[int]:
"""
This method calculates turnaround time of processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.multi_level_feedback_queue()
>>> mlfq.calculate_turnaround_time([P1, P2, P3, P4])
[136, 34, 162, 125]
"""
turnaround_times = []
for i in range(len(queue)):
turnaround_times.append(queue[i].turnaround_time)
return turnaround_times
def calculate_completion_time(self, queue: list[Process]) -> list[int]:
"""
This method calculates completion time of processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.multi_level_feedback_queue()
>>> mlfq.calculate_turnaround_time([P1, P2, P3, P4])
[136, 34, 162, 125]
"""
completion_times = []
for i in range(len(queue)):
completion_times.append(queue[i].stop_time)
return completion_times
def calculate_remaining_burst_time_of_processes(
self, queue: deque[Process]
) -> list[int]:
"""
This method calculate remaining burst time of processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> finish_queue, ready_queue = mlfq.round_robin(deque([P1, P2, P3, P4]), 17)
>>> mlfq.calculate_remaining_burst_time_of_processes(mlfq.finish_queue)
[0]
>>> mlfq.calculate_remaining_burst_time_of_processes(ready_queue)
[36, 51, 7]
>>> finish_queue, ready_queue = mlfq.round_robin(ready_queue, 25)
>>> mlfq.calculate_remaining_burst_time_of_processes(mlfq.finish_queue)
[0, 0]
>>> mlfq.calculate_remaining_burst_time_of_processes(ready_queue)
[11, 26]
"""
return [q.burst_time for q in queue]
def update_waiting_time(self, process: Process) -> int:
"""
This method updates waiting times of unfinished processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> mlfq.current_time = 10
>>> P1.stop_time = 5
>>> mlfq.update_waiting_time(P1)
5
"""
process.waiting_time += self.current_time - process.stop_time
return process.waiting_time
def first_come_first_served(self, ready_queue: deque[Process]) -> deque[Process]:
"""
FCFS(First Come, First Served)
FCFS will be applied to MLFQ's last queue
A first came process will be finished at first
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.first_come_first_served(mlfq.ready_queue)
>>> mlfq.calculate_sequence_of_finish_queue()
['P1', 'P2', 'P3', 'P4']
"""
finished: deque[Process] = deque() # sequence deque of finished process
while len(ready_queue) != 0:
cp = ready_queue.popleft() # current process
# if process's arrival time is later than current time, update current time
if self.current_time < cp.arrival_time:
self.current_time += cp.arrival_time
# update waiting time of current process
self.update_waiting_time(cp)
# update current time
self.current_time += cp.burst_time
# finish the process and set the process's burst-time 0
cp.burst_time = 0
# set the process's turnaround time because it is finished
cp.turnaround_time = self.current_time - cp.arrival_time
# set the completion time
cp.stop_time = self.current_time
# add the process to queue that has finished queue
finished.append(cp)
self.finish_queue.extend(finished) # add finished process to finish queue
# FCFS will finish all remaining processes
return finished
def round_robin(
self, ready_queue: deque[Process], time_slice: int
) -> tuple[deque[Process], deque[Process]]:
"""
RR(Round Robin)
RR will be applied to MLFQ's all queues except last queue
All processes can't use CPU for time more than time_slice
If the process consume CPU up to time_slice, it will go back to ready queue
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
python,python-scheduling.rar


Python是一种广泛使用的高级编程语言,以其易读性、简洁性和丰富的库支持而闻名。在Python中,调度是指安排任务在特定时间执行,这通常涉及到定时器、事件循环和线程等概念。`python-scheduling.rar`这个压缩包可能包含了一个关于Python定时任务调度的资源集合,比如代码示例、教程或库的文档。 Python中进行任务调度有多种方法,最常用的是使用内置的`time`模块和第三方库如`schedule`、`APScheduler`等。`time`模块提供了一些基本的时间处理函数,如`sleep()`,可以用来延迟程序执行。但若要实现更复杂的定时任务,如周期性执行或者在特定日期和时间执行,我们就需要借助更强大的工具。 `schedule`库是一个轻量级的Python任务调度库,它允许你在Python脚本中以自然的语言描述任务的执行计划。例如,你可以轻松地安排一个任务每天早上6点执行,或者每隔5分钟执行一次。使用`schedule`库的基本步骤包括定义任务函数、设定调度规则以及启动调度器。 ```python import schedule import time def job(): print("任务正在执行") # 每天的6:30执行job函数 schedule.every().day.at("06:30").do(job) # 运行调度器,检查是否有待执行的任务 while True: schedule.run_pending() time.sleep(1) ``` 除了`schedule`库,`APScheduler`是另一个功能更全面的定时任务库,它支持更多类型的调度策略,如间隔执行、一次性任务、cron定时等。`APScheduler`还可以与事件循环(如`asyncio`)集成,适合在异步环境中使用。 ```python from apscheduler.schedulers.blocking import BlockingScheduler def job(): print("任务正在执行") scheduler = BlockingScheduler() # 每天的6:30执行job函数 scheduler.add_job(job, 'cron', hour=6, minute=30) # 启动调度器 scheduler.start() ``` `python-scheduling.rar`中的资源可能涵盖了这些库的使用示例、配置方法以及如何在实际项目中应用它们的知识。通过学习和实践,你可以掌握如何在Python项目中有效地管理和调度任务,提高自动化处理和后台服务的效率。 在实际开发中,Python的定时任务调度常用于日志清理、数据同步、定时报告发送、爬虫定时运行等场景。理解并熟练掌握这些调度技术,对于提升你的Python编程能力,特别是在构建长期运行的服务时,具有重要意义。




































- 1

- #完美解决问题
- #运行顺畅
- #内容详尽
- #全网独家
- #注释完整

- 粉丝: 1w+
- 资源: 3866
我的内容管理 展开
我的资源 快来上传第一个资源
我的收益
登录查看自己的收益我的积分 登录查看自己的积分
我的C币 登录后查看C币余额
我的收藏
我的下载
下载帮助


最新资源
- C# CAD操作:插入外部DWG图纸到当前数据库的方法与应用
- Platinum-md v1.2.0
- AutoCAD常见错误代码及其解释与解决方案
- 基于ensp的企业网络规划设计仿真.zip
- CAD系统变量.txt
- 多文档操作规范流程.txt
- 智能指针.txt
- CAD二次开发.txt
- AutoBatchBuildSolutions.ps1
- zadig-2.9.zip
- spring boot 整合 deepseek 应用,下载 就能用
- OpenGL在linux系统中的3D渲染实现
- 2024年网络安全宣传周手册.pdf
- 开源项目微信群永久二维码源码
- 2024年新能源产业深度研究.pdf
- 2024年中国金融行业网络安全案例集.pdf


