-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproblemsolver.py
179 lines (154 loc) · 6.3 KB
/
problemsolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from stats import Stats
from solver_utils import *
from common import *
from random import shuffle
class ProblemSolver:
"""
Solves problems by searching for a solution.
"""
def __init__(self):
self.stats = Stats()
def search_problem_solution(self, problem, algorithm, strategy=SearchStrategy.NONE):
"""
Applys a search algorithm and strategy to a problem.
"""
if algorithm == SearchAlgorithm.TREESEARCH:
strategy = strategy if strategy != SearchStrategy.NONE else SearchStrategy.A_STAR
return self.treesearch(problem, strategy)
elif algorithm == SearchAlgorithm.GRAPHSEARCH:
strategy = strategy if strategy != SearchStrategy.NONE else SearchStrategy.A_STAR
return self.graphsearch(problem, strategy)
elif algorithm == SearchAlgorithm.RBFS:
return self.RBFS(problem)
elif algorithm == SearchAlgorithm.ITERATIVE_DEEPENING:
return self.iterative_deepening(problem)
return NotImplemented
def expand(self, problem, pnode):
"""
Expands a node and returns its successors.
"""
successors = []
self.stats.stat_node_expansion(pnode)
for act_res in problem.succ_f(pnode.state):
cnode = Node(act_res.state, pnode, act_res.action, pnode.depth+1, pnode.pathCost+act_res.action.cost)
cnode.h = problem.h(cnode)
cnode.f = problem.f(cnode)
successors.append(cnode)
self.stats.stat_node_creation(cnode)
#shuffle(successors)
return successors
def path(self, root, node):
"""
Returns a path from a root node to a node in a deeper layer.
"""
path = []
while node != root:
if node == None:
return None
path.insert(0, node);
node = node.parent
return path
def get_stats(self):
"""
Returns the statistics generated by applying search algorithms to
problems.
"""
return self.stats
def treesearch(self, problem, strategy):
"""
Searches for a solution to the given problem using the treesearch
algorithm and a given search strategy.
"""
root = Node(problem.start_state, None, Action(0, None))
self.stats.stat_node_creation(root)
fringe = Fringe(strategy)
fringe.insert(root)
while True:
if fringe.empty():
return None
fringe.print_contents()
currNode = fringe.pop()
if (problem.is_goal(currNode)):
return self.path(root, currNode)
successors = self.expand(problem, currNode)
if problem.maximum_depth > 0 and len([x for x in successors if x.depth > problem.maximum_depth]) > 0:
return None
fringe.extend(successors)
def graphsearch(self, problem, strategy):
"""
Searches for a solution to the given problem using the graphsearch
algorithm and a given search strategy.
"""
root = Node(problem.start_state, None, Action(0, None))
self.stats.stat_node_creation(root)
fringe = Fringe(strategy)
fringe.insert(root)
closed = set([])
while True:
if fringe == []:
return None
fringe.print_contents()
currNode = fringe.pop()
if (problem.is_goal(currNode)):
return self.path(root, currNode)
if not currNode.state in closed:
closed.add(currNode.state)
successors = self.expand(problem, currNode)
accepted_successors = [item for item in successors if not item.state in closed]
rejected_successors = [item for item in successors if item.state in closed]
self.stats.stat_node_deletion(rejected_successors)
if problem.maximum_depth > 0 and len([x for x in accepted_successors if x.depth > problem.maximum_depth]) > 0:
return None
fringe.extend(accepted_successors)
else:
self.stats.stat_node_ignoring(currNode)
def RBFS(self, problem):
"""
Searches for a solution to the given problem using the RBFS algorithm.
"""
def recursion(problem, cNode, cLimit):
if problem.is_goal(cNode):
return ([cNode], 0)
successors = self.expand(problem, cNode)
if successors == []:
return (None, INFINITY)
for node in successors:
node.f = max(cNode.f, problem.f(node))
while True:
successors.sort(key=lambda x: x.f)
best = successors[0]
if best.f > cLimit:
self.stats.stat_node_deletion(successors)
return (None, best.f)
limit = min(cLimit, successors[1].f) if len(successors) > 1 else cLimit
rnl, rlim = recursion(problem, best, limit)
if rnl != None:
rnl.insert(0, cNode)
return (rnl, 0)
best.f = rlim
root = Node(problem.start_state, None, Action(0, None))
self.stats.stat_node_creation(root)
return recursion(problem, root, INFINITY)[0][1:]
def iterative_deepening(self, problem, max_depth=1):
"""
Searches for a solution to the given problem using the Iterative
Deepening search algorithm.
"""
if max_depth > problem.maximum_depth:
return None
created_nodes_start = self.stats.created_nodes
root = Node(problem.start_state, None, Action(0, None))
fringe = Fringe(SearchStrategy.DEPTH_FIRST)
fringe.insert(root)
while True:
if fringe.empty():
break
fringe.print_contents()
currNode = fringe.pop()
if (problem.is_goal(currNode)):
return self.path(root, currNode)
if currNode.depth <= max_depth:
fringe.extend(self.expand(problem, currNode))
self.stats.deleted_nodes += self.stats.created_nodes - created_nodes_start
self.stats.calc_memory_usage()
return self.iterative_deepening(problem, max_depth+1)