本篇内容介绍了“Python如何实现关键路径和七格图计算”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1.主程序
主程序主要实现了一个Project类,其中包含了计算关键路径和七格图的方法。具体实现方式如下:
1. 定义了一个Activity类,包含了活动的id、名称、持续时间和紧前任务列表等属性。
2. 定义了一个Project类,包含了活动列表、项目持续时间、日志等属性,以及计算关键路径、计算七格图、计算总浮动时间、计算自由浮动时间等方法。
3. 从JSON文件中读取活动信息,并创建Project对象并添加活动。
4. 调用Project对象的calculate方法,计算每个活动的最早开始时间、最晚开始时间等数据。
5. 调用Project对象的calculate_critical_path方法,计算关键路径。
6. 调用Project对象的calculate_project_duration方法,计算项目总工期。
7. 使用Jinja2模板引擎生成项目的活动清单,并将关键路径
import json from datetime import datetime from typing import List import graphviz from jinja2 import Template from activity import Activity class Project: def __init__(self): self.activities: List[Activity] = [] self.duration = 0 self.logger = [] def log(self, log: str) -> None: self.logger.append(log) def add_activity(self, activity: Activity) -> None: """ 添加一个活动到项目中 :param activity: 待添加的活动 """ # 将活动添加到项目中 self.activities.append(activity) def calculate(self) -> None: """ 计算整个项目的关键信息 :return: None """ self.calculate_successor() self._calculate_forward_pass() # 计算正推法 self._calculate_backward_pass() # 计算倒推法 self._calculate_total_floats() # 计算总浮动时间 self._calculate_free_floats() # 计算自由浮动时间 def calculate_successor(self) -> None: self.log("开始计算紧后活动") for act in self.activities: for pred in act.predecessors: for act_inner in self.activities: if act_inner.id == pred: act_inner.successors.append(act.id) def _calculate_forward_pass(self) -> None: self.log("## 开始正推法计算") # 进入 while 循环,只有当所有活动的最早开始时间和最早完成时间都已经计算出来时,才会退出循环 while not self._is_forward_pass_calculated(): # 遍历每个活动 for activity in self.activities: # 如果活动的最早开始时间已经被计算过,则跳过 if activity.est is not None: continue # 如果活动没有前置活动, 则从1开始计算最早开始时间和最早结束时间 if not activity.predecessors: activity.est = 1 activity.eft = activity.est + activity.duration - 1 self.log( f"活动 {activity.name} 没有紧前活动,设定最早开始时间为1, 并根据工期计算最早结束时间为{activity.eft}") else: # 计算当前活动的所有前置活动的最早完成时间 predecessors_eft = [act.eft for act in self.activities if act.id in activity.predecessors and act.eft is not None] # 如果当前活动的所有前置活动的最早完成时间都已经计算出来,则计算当前活动的最早开始时间和最早完成时间 if len(predecessors_eft) == len(activity.predecessors): activity.est = max(predecessors_eft) + 1 activity.eft = activity.est + activity.duration - 1 self.log( f"活动 {activity.name} 紧前活动已完成正推法计算, 开始日期按最早开始时间里面最大的," + f"设定为{activity.est}并根据工期计算最早结束时间为{activity.eft}") # 更新项目总持续时间为最大最早完成时间 self.duration = max([act.eft for act in self.activities]) def _calculate_backward_pass(self) -> None: """ 计算倒推法 :return: None """ self.log("## 开始倒推法计算") # 输出提示信息 # 进入 while 循环,只有当所有活动的最晚开始时间和最晚完成时间都已经计算出来时,才会退出循环 while not self._is_backward_pass_calculated(): # 遍历每个活动 for act in reversed(self.activities): # 如果活动的最晚开始时间已经被计算过,则跳过 if act.lft is not None: continue # 如果活动没有后继活动, 则从总持续时间开始计算最晚开始时间和最晚结束时间 if not act.successors: act.lft = self.duration act.lst = act.lft - act.duration + 1 self.log(f"活动 {act.name} 没有紧后活动,按照正推工期设定最晚结束时间为{act.lft}," + f"并根据工期计算最晚开始时间为{act.lst}") else: # 计算当前活动的所有后继活动的最晚开始时间 successors_lst = self._calculate_lst(act) # 如果当前活动的所有后继活动的最晚开始时间都已经计算出来,则计算当前活动的最晚开始时间和最晚完成时间 if len(successors_lst) == len(act.successors): act.lft = min(successors_lst) - 1 act.lst = act.lft - act.duration + 1 self.log(f"活动 {act.name} 紧后活动计算完成,按照倒推工期设定最晚结束时间为{act.lft}," + f"并根据工期计算最晚开始时间为{act.lst}") # 更新项目总持续时间为最大最晚完成时间 self.duration = max([act.lft for act in self.activities]) def _calculate_lst(self, activity: Activity) -> List[int]: """计算某一活动的所有最晚开始时间 :param activity: 活动对象 :return: 最晚开始时间列表 """ rst = [] # 初始化结果列表 for act in activity.successors: # 遍历该活动的后继活动 for act2 in self.activities: # 遍历所有活动 if act2.id == act and act2.lst is not None: # 如果找到了该后继活动且其最晚开始时间不为空 rst.append(act2.lst) # 将最晚开始时间加入结果列表 return rst # 返回结果列表 def _is_forward_pass_calculated(self) -> bool: """ 判断整个项目正推法计算已经完成 :return: 若已计算正向传递则返回True,否则返回False """ for act in self.activities: # 遍历所有活动 if act.est is None or act.eft is None: # 如果该活动的最早开始时间或最早完成时间为空 return False # 则返回False,表示还未计算正向传递 return True # 如果所有活动的最早开始时间和最早完成时间都已计算,则返回True,表示已计算正向传递 def _is_backward_pass_calculated(self) -> bool: """ 判断整个项目倒推法计算已经完成 :return: 若已计算倒推法则返回True,否则返回False """ for act in self.activities: # 遍历所有活动 if act.lst is None or act.lft is None: # 如果该活动的最晚开始时间或最晚完成时间为空 return False # 则返回False,表示还未计算倒推法 return True # 如果所有活动的最晚开始时间和最晚完成时间都已计算,则返回True,表示已计算倒推法 def _calculate_total_floats(self) -> None: """ 计算所有活动的总浮动时间 :return: None """ self.log(f"## 开始计算项目所有活动的总浮动时间") for act in self.activities: # 遍历所有活动 if act.est is not None and act.lst is not None: # 如果该活动的最早开始时间和最晚开始时间都已计算 act.tf = act.lst - act.est # 则计算该活动的总浮动时间 self.log(f"计算{act.name}的总浮动时间" + f"最晚开始时间{act.lst} - 最早开始时间{act.est} = {act.tf}", ) else: # 如果该活动的最早开始时间或最晚开始时间为空 act.tf = None # 则将该活动的总浮动时间设为None def _calculate_free_floats(self) -> None: """ 计算所有活动的自由浮动时间 :return: None """ self.log(f"## 开始计算项目所有活动的自由浮动时间") # 输出提示信息 for act in self.activities: # 遍历所有活动 if act.tf == 0: # 如果该活动的总浮动时间为0 self.log(f"计算{act.name}的自由浮动时间" + f"因为{act.name}的总浮动时间为0,自由浮动时间为0") # 输出提示信息 act.ff = 0 # 则将该活动的自由浮动时间设为0 elif act.tf > 0: # 如果该活动的总浮动时间大于0 self.log(f"计算{act.name}的自由浮动时间") # 输出提示信息 self.log(f"- {act.name}的总浮动时间{act.tf} > 0,") # 输出提示信息 tmp = [] # 初始化临时列表 for act2 in self.activities: # 遍历所有活动 if act2.id in act.successors: # 如果该活动是该活动的紧后活动 self.log(f"- {act.name}的紧后活动{act2.name}的自由浮动动时间为{act2.tf}") # 输出提示信息 tmp.append(act2.tf) # 将该紧后活动的自由浮动时间加入临时列表 if len(tmp) != 0: # 如果临时列表不为空 act.ff = act.tf - max(tmp) # 则计算该活动的自由浮动时间 if act.ff < 0: act.ff = 0 self.log(f"- 用活动自己的总浮动{act.tf}减去多个紧后活动总浮动的最大值{max(tmp)} = {act.ff}") else: # 如果临时列表为空 act.ff = act.tf # 则将该活动的自由浮动时间设为总浮动时间 def calculate_critical_path(self) -> List[Activity]: """ 计算整个项目的关键路径 :return: 整个项目的关键路径 """ ctc_path = [] # 初始化关键路径列表 for act in self.activities: # 遍历所有活动 if act.tf == 0: # 如果该活动的总浮动时间为0 ctc_path.append(act) # 则将该活动加入关键路径列表 return ctc_path # 返回关键路径列表 def calculate_project_duration(self) -> int: """ 计算整个项目的持续时间 :return: 整个项目的持续时间 """ return max(activity.eft for activity in self.activities) # 返回所有活动的最早完成时间中的最大值,即整个项目的持续时间 # 从JSON文件中读取活动信息 with open('activities.json', 'r', encoding='utf-8') as f: activities_data = json.load(f) # 创建Project对象并添加活动 project = Project() for activity_data in activities_data: activity = Activity( activity_data['id'], activity_data['name'], activity_data['duration'], activity_data['predecessors'] ) project.add_activity(activity) # 计算每个活动的最早开始时间、最晚开始时间等数据 project.calculate() # 计算关键路径和项目总工期 critical_path = project.calculate_critical_path() project_duration = project.calculate_project_duration() # 生成项目的活动清单 with open('template.html', 'r', encoding='utf-8') as f: template = Template(f.read()) html = template.render( activities=project.activities, critical_path=critical_path, project_duration=project_duration, log=project.logger ) # 生成项目进度网络图 aon_graph = graphviz.Digraph(format='png', graph_attr={'rankdir': 'LR'}) for activity in project.activities: aon_graph.node(str(activity.id), activity.name) for predecessor in activity.predecessors: aon_graph.edge(str(predecessor), str(activity.id)) timestamp = datetime.now().strftime('%Y%m%d%H%M%S') aon_filename = f"aon_{timestamp}" aon_graph.render(aon_filename) # 将项目进度网络图插入到HTML文件中 aon_image = f'<img src="{aon_filename}.png" alt="Precedence Diagramming Method: AON">' html = html.replace('<p>Precedence Diagramming Method: AON: <br/>[image]</p>', '<p>紧前关系绘图法: AON: <br/>' + aon_image + '</p>') filename = datetime.now().strftime('%Y%m%d%H%M%S') + '.html' with open(filename, 'w', encoding='utf-8') as f: f.write(html)
2.活动类
程序名:
activity.py
class Activity: """ 活动类,用于表示项目中的一个活动。 Attributes: id (int): 活动的唯一标识符。 name (str): 活动的名称。 duration (int): 活动的持续时间。 predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。 est (int): 活动的最早开始时间。 lst (int): 活动的最晚开始时间。 eft (int): 活动的最早完成时间。 lft (int): 活动的最晚完成时间。 tf (int): 活动的总浮动时间。 ff (int): 活动的自由浮动时间。 successors (List[int]): 活动的后继活动列表,存储后继活动的Activity对象。 """ def __init__(self, id: int, name: str, duration: int, predecessors: List[int]): """ 初始化活动对象。 Args: id (int): 活动的唯一标识符。 name (str): 活动的名称。 duration (int): 活动的持续时间。 predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。 """ self.id = id self.name = name self.duration = duration self.predecessors = predecessors self.est = None self.lst = None self.eft = None self.lft = None self.tf = None self.ff = None self.successors = [] def __str__(self): return f"id: {self.id}, name: {self.name}, est: {self.est}, lst: {self.lst}, eft: {self.eft}, lft: {self.lft}," + f"successors: {self.successors}"
3.任务列表JSON文件
文件名:
activities.json
[ { "id": 1, "name": "A", "duration": 2, "predecessors": [] }, { "id": 9, "name": "A2", "duration": 3, "predecessors": [] }, { "id": 10, "name": "A3", "duration": 2, "predecessors": [] }, { "id": 2, "name": "B", "duration": 3, "predecessors": [ 1, 9 ] }, { "id": 3, "name": "C", "duration": 4, "predecessors": [ 1 ] }, { "id": 4, "name": "D", "duration": 2, "predecessors": [ 2,10 ] }, { "id": 5, "name": "E", "duration": 3, "predecessors": [ 2 ] }, { "id": 6, "name": "F", "duration": 2, "predecessors": [ 3 ] }, { "id": 7, "name": "G", "duration": 3, "predecessors": [ 4, 5 ] }, { "id": 8, "name": "H", "duration": 2, "predecessors": [ 6, 7 ] }, { "id": 11, "name": "H2", "duration": 4, "predecessors": [ 6, 7 ] } ]
4.输出模板文件
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>PMP关键路径计算</title> <style> table { border-collapse: collapse; width: 100%; } th, td { border: 1px solid black; padding: 8px; text-align: center; } th { background-color: #4CAF50; color: white; } .critical { background-color: #ffcccc; } </style> </head> <body> <h2>活动清单</h2> <table> <tr> <th>ID</th> <th>活动名</th> <th>持续时间</th> <th>紧前活动</th> <th>紧后活动</th> <th>最早开始时间EST</th> <th>最早结束时间EFT</th> <th>最晚开始时间LST</th> <th>最晚结束时间LFT</th> <th>总浮动时间TF</th> <th>自由浮动时间FF</th> </tr> {% for activity in activities %} <tr {% if activity in critical_path %}class="critical" {% endif %}> <td>{{ activity.id }}</td> <td>{{ activity.name }}</td> &n