class Node:
"""Class for representing nodes in a graph"""
node_count = 0
def __init__(self, contained, nodeId=None):
"""Initializer that will auto populate the node ID if none given"""
self.contained = contained
self.edges = []
# If no node ID, use the node counter
if nodeId:
self.id = nodeId
else:
self.id = Node.node_count
Node.node_count += 1
def get_neighbors(self):
"""Iterate through the edges to find neighbors"""
return {e.right for e in self.edges}
def get_reachable(self, max_distance):
"""Return all nodes within max_distance hops"""
if ( max_distance == 0 ):
return set()
elif ( max_distance == 1 ):
return self.get_neighbors()
else:
neighbors = self.get_neighbors()
reachable = neighbors
for n in neighbors:
reachable = reachable.union(n.get_reachable(max_distance-1))
if ( self in reachable ):
reachable.remove(self)
return reachable
def __eq__(self, other):
"""Comparison using ID"""
return self.id == other.id
def __hash__(self):
"""Hash using ID"""
return hash(self.id)
def __str__(self):
"""Use the string method of the contained object"""
return self.contained.__str__()
def __repr__(self):
return self.__str__()
class Edge:
"""Represent edges in a simple way"""
def __init__(self, left, right, weight=1.0):
"""Create a directed edge from left to right with weight"""
self.left = left
self.right = right
self.weight = weight
def __str__(self):
"""Return a string showing this edge"""
return "(%s) -> (%s)" % (self.left.__str__(), self.right.__str__())
def __repr__(self):
return self.__str__()
help(Node)
help(Node.__str__)
We can write some unit tests to ensure our code works.
import unittest
class NodeTests(unittest.TestCase):
def test_node_init(self):
test_contained_value = "This is a test string"
test_node_id = 1337
new_node = Node(test_contained_value, test_node_id)
self.assertEqual(new_node.contained, test_contained_value)
self.assertEqual(new_node.id, test_node_id)
self.assertTrue(len(new_node.edges) == 0)
self.assertTrue(len(new_node.get_neighbors()) == 0)
def test_node_init_auto_id(self):
test_contained_value = "This is a test string"
test_node_id = 1337
Node.node_count = test_node_id
new_node = Node(test_contained_value)
self.assertEqual(new_node.contained, test_contained_value)
self.assertEqual(new_node.id, test_node_id)
self.assertEqual(Node.node_count, test_node_id+1)
def test_node_eq(self):
test_contained_value = "This is a test string"
test_node_id = 1337
new_node_1 = Node(test_contained_value, test_node_id)
new_node_2 = Node(test_contained_value, test_node_id)
self.assertTrue(new_node_1.__eq__(new_node_2))
self.assertTrue(new_node_1 == new_node_2)
def test_node_hash(self):
test_contained_value = "This is a test string"
test_node_id = 1337
new_node_1 = Node(test_contained_value, test_node_id)
self.assertTrue(new_node_1.__hash__() == hash(test_node_id))
def test_node_neighbors(self):
test_contained_value = "This is a test string"
test_node_id = 1337
new_node_1 = Node(test_contained_value, test_node_id)
self.assertTrue(len(new_node_1.get_neighbors()) == 0)
neighbor_ids = {1,2,3,4,5}
neighbor_set = set()
for neighbor_id in neighbor_ids:
tmp_node = Node("dummy_val", neighbor_id)
tmp_edge = Edge(new_node_1, tmp_node)
neighbor_set.add(tmp_node)
new_node_1.edges.append(tmp_edge)
self.assertTrue(len(new_node_1.get_neighbors()) == len(neighbor_ids))
self.assertTrue(new_node_1.get_neighbors().intersection(neighbor_set) == neighbor_set)
def test_node_reachable_no_neighbors(self):
test_contained_value = "This is a test string"
test_node_id = 1337
new_node_1 = Node(test_contained_value, test_node_id)
self.assertTrue(new_node_1.get_reachable(0) == set())
self.assertTrue(new_node_1.get_reachable(1) == set())
self.assertTrue(new_node_1.get_reachable(2) == set())
def test_node_reachable_d1_neighbors(self):
test_contained_value = "This is a test string"
test_node_id = 1337
new_node_1 = Node(test_contained_value, test_node_id)
self.assertTrue(len(new_node_1.get_neighbors()) == 0)
neighbor_ids = {1,2,3,4,5}
neighbor_set = set()
for neighbor_id in neighbor_ids:
tmp_node = Node("dummy_val", neighbor_id)
tmp_edge = Edge(new_node_1, tmp_node)
neighbor_set.add(tmp_node)
new_node_1.edges.append(tmp_edge)
self.assertTrue(new_node_1.get_reachable(0) == set())
self.assertTrue(new_node_1.get_reachable(1) == neighbor_set)
self.assertTrue(new_node_1.get_reachable(2) == neighbor_set)
def test_node_reachable_deep(self):
test_contained_value = "This is a test string"
test_node_id = 1337
new_node_1 = Node(test_contained_value, test_node_id)
self.assertTrue(len(new_node_1.get_neighbors()) == 0)
neighbor_ids = {1,2,3,4,5}
neighbor_list = list()
last_neighbor = new_node_1
for neighbor_id in neighbor_ids:
tmp_node = Node("dummy_val", neighbor_id)
tmp_edge = Edge(last_neighbor, tmp_node)
last_neighbor.edges.append(tmp_edge)
last_neighbor = tmp_node
neighbor_list.append(tmp_node)
self.assertTrue(new_node_1.get_reachable(0) == set())
test_set = set(neighbor_list[:1])
self.assertTrue(new_node_1.get_reachable(1) == test_set)
test_set = set(neighbor_list[:2])
self.assertTrue(new_node_1.get_reachable(2) == test_set)
test_set = set(neighbor_list[:3])
self.assertTrue(new_node_1.get_reachable(3) == test_set)
test_set = set(neighbor_list[:4])
self.assertTrue(new_node_1.get_reachable(4) == test_set)
test_set = set(neighbor_list[:5])
self.assertTrue(new_node_1.get_reachable(5) == test_set)
def this_is_not_a_test(self):
self.assertTrue(True)
def test_this_is_a_test(self):
self.assertTrue(False)
def test_this_is_a_test_of_error(self):
self.assertTrue(True)
x = self._not_existing + 1
test_module = NodeTests()
suite = unittest.TestLoader().loadTestsFromModule(test_module)
unittest.TextTestRunner(verbosity=2).run(suite)
class Graph:
def __init__(self):
self.nodes = {}
self.edges = {}
def add_node(self, node):
"""Add a node if it doesn't already exist"""
if ( node.id not in self.nodes ):
self.nodes[node.id] = node
return node
def add_edge(self, leftNode, rightNode, weight=1.0):
"""Add an undirected edge, testing to make sure nodes exist"""
if ( leftNode.id not in self.nodes ):
raise Error("Unknown left node")
if ( rightNode.id not in self.nodes ):
raise Error("Unknown right node")
e1 = self.add_directed_edge(leftNode, rightNode, weight)
e2 = self.add_directed_edge(rightNode, leftNode, weight)
return (e1, e2)
def add_directed_edge(self, leftNode, rightNode, weight=1.0):
"""Add an directed edge, testing to make sure nodes exist"""
if ( leftNode.id not in self.nodes ):
raise Error("Unknown left node")
if ( rightNode.id not in self.nodes ):
raise Error("Unknown right node")
edge_tuple = (leftNode.id, rightNode.id)
if ( edge_tuple not in self.edges ):
new_edge = Edge(leftNode, rightNode, weight=weight)
self.edges[edge_tuple] = new_edge
leftNode.edges.append(new_edge)
return new_edge
else:
return self.edges[edge_tuple]
g = Graph()
n1 = Node("DC")
n2 = Node("New York")
n3 = Node("Boston")
n4 = Node("Albany")
n5 = Node("Montreal")
n6 = Node("Quebec City")
print(n1)
print(n2)
print(n3)
print(n4)
print(n5)
g.add_node(n1)
g.add_node(n2)
g.add_node(n3)
g.add_node(n4)
g.add_node(n5)
g.add_node(n6)
e = g.add_edge(n1, n2)
print(e)
e = g.add_edge(n1, n3)
print(e)
e = g.add_edge(n2, n4)
print(e)
e = g.add_edge(n2, n5)
print(e)
e = g.add_edge(n5, n6)
print(e)
for x in n1.get_reachable(3):
print(x)
n1.get_reachable(3) - n1.get_reachable(2) - n1.get_reachable(1)
Graph
¶Using the unittest framework, write tests using the provided class.
class GraphTests(unittest.TestCase):
def test_add_node(self):
"""Does the graph add nodes correctly?"""
g = Graph()
n1 = Node("DC")
g.add_node(n1)
self.assertTrue(n1.id in g.nodes)
self.assertTrue(n1 in g.nodes.values())
self.assertTrue(len(g.nodes) == 1)
def test_add_undirected_edge(self):
"""Does the graph add undirected edges correctly?"""
g = Graph()
n1 = Node("DC")
n2 = Node("New York")
g.add_node(n1)
g.add_node(n2)
e = g.add_edge(n1, n2)
self.assertTrue(len(g.edges) == 2)
self.assertTrue(n1 in n2.get_neighbors())
self.assertTrue(n2 in n1.get_neighbors())
def test_add_directed_edge(self):
"""Does the graph add directed edges correctly?"""
g = Graph()
n1 = Node("DC")
n2 = Node("New York")
g.add_node(n1)
g.add_node(n2)
e = g.add_directed_edge(n1, n2)
self.assertTrue(len(g.edges) == 1)
self.assertTrue(n2 in n1.get_neighbors())
def test_node_count_zero_nodes(self):
"""Is the count of nodes what we expect?"""
g = Graph()
self.assertTrue(len(g.nodes) == 0)
def test_node_count_one_node(self):
"""Is the count of nodes what we expect?"""
g = Graph()
n1 = Node("DC")
g.add_node(n1)
self.assertTrue(len(g.nodes) == 1)
def test_node_count_two_nodes(self):
"""Is the count of nodes what we expect?"""
g = Graph()
g.add_node(Node("DC"))
g.add_node(Node("NYC"))
self.assertTrue(len(g.nodes) == 2)
def test_node_count_three_nodes(self):
"""Is the count of nodes what we expect?"""
g = Graph()
g.add_node(Node("DC"))
g.add_node(Node("NYC"))
g.add_node(Node("Boston"))
self.assertTrue(len(g.nodes) == 3)
def test_edge_count(self):
"""Is the count of edges what we expect?"""
pass
def test_cities(self):
"""A test case connecting a few cities"""
g = Graph()
n1 = Node("DC")
n2 = Node("New York")
n3 = Node("Boston")
n4 = Node("Albany")
n5 = Node("Montreal")
n6 = Node("Quebec City")
g.add_node(n1)
g.add_node(n2)
g.add_node(n3)
g.add_node(n4)
g.add_node(n5)
g.add_node(n6)
e = g.add_edge(n1, n2)
e = g.add_edge(n1, n3)
e = g.add_edge(n2, n4)
e = g.add_edge(n2, n5)
e = g.add_edge(n5, n6)
test_module = GraphTests()
suite = unittest.TestLoader().loadTestsFromModule(test_module)
unittest.TextTestRunner(verbosity=2).run(suite)
Here, we'll look at the movie dataset again, using graphs to solve the Bacon game.
import pandas as pd
df = pd.read_csv("https://dl.dropboxusercontent.com/u/66442241/adj.csv",
header=None)
# Create new graph
g = Graph()
# Reset node counter since we're creating
Node.node_count = 0
# Map actors to their nodes
actor_to_node_dict = {}
for actor in set(df[0]):
new_node = Node(actor)
actor_to_node_dict[actor] = new_node
g.add_node(new_node)
x = actor_to_node_dict["Robert Downey Jr."]
print(type(x))
print(x)
# The data set has edges for every pair, so we'll add them
# as directed edges
for index, row in df.iterrows():
left_actor = row[0]
left_node = actor_to_node_dict[left_actor]
right_actor = row[1]
right_node = actor_to_node_dict[right_actor]
e = g.add_edge(left_node, right_node)
actor_to_node_dict["Morgan Freeman"].get_neighbors()
Our dataset doesn't actually have Kevin Bacon, but it does have Morgan Freeman, who has been in many films.
# Target actor
target_actor = "Morgan Freeman"
target = actor_to_node_dict[target_actor]
# Choose a random actor from the dataset
random_actor = "Ryan Reynolds"
# Pull the actor from the dictionary
actor_node = actor_to_node_dict[random_actor]
print(actor_node)
# Try different distances and see if target is reachable
def distance_to_target(source, target):
for d in range(10):
if target in source.get_reachable(d):
print("Found", target, "at d:", d)
break
return d
distance_to_target(actor_node, target)
Now we'll use the NetworkX package to visualize some of this graph.
import networkx as nx
g = nx.Graph()
for actor_node in actor_to_node_dict.values():
g.add_node(actor_node.contained)
for n in actor_node.get_neighbors():
g.add_edge(actor_node.contained, n.contained)
print("DF Shape:", df.shape)
print("Unique Actors:", len(actor_to_node_dict))
print("Nodes:", len(g.nodes()))
We can use NetworkX to find the path from a source to target actor, as opposed to just the path length we have above.
IMDb has a website for this sort of thing: http://www.imdb.com/search/common
# Print the shortest path between actors
print(nx.shortest_path(g, source=random_actor, target=target_actor))
# Other interesting actors to try
# source="Amber Armstrong"
# source="Gavin MacLeod"
# source="Ashley Palmer"
# source="Hal Linden"
# source="Trey Parker"
print(nx.shortest_path(g, source="Amber Armstrong", target=target_actor))
NetworkX isn't really well-suited for visualization, but it does work if we have small enough data.
%matplotlib inline
import matplotlib.pyplot as plt
degree_dict = g.degree()
top_actors = sorted(degree_dict, key=degree_dict.get, reverse=True)[:20]
subg = g.subgraph(top_actors)
pos=nx.spring_layout(subg, k=0.9, iterations=100)
nx.draw(subg, pos, with_labels=True, hold=False)
plt.show()
Not all actors in our dataset have starred in movies that connect them to the rest of the network. Here, we investigate those cases.
for subg in nx.components.connected_component_subgraphs(g):
node_count = len(subg.nodes())
if ( node_count < 4 ):
continue
print("Number of nodes in subgraph:", node_count)
diam = nx.diameter(subg)
print("Shorted Path Between Most Distance Nodes:", diam)
# Get the top 20 actors in this network
degree_dict = subg.degree()
top_actors = sorted(degree_dict, key=degree_dict.get, reverse=True)[:20]
subsubg = subg.subgraph(top_actors)
pos=nx.spring_layout(subsubg, k=0.9, iterations=100)
nx.draw(subsubg, pos, with_labels=True, hold=False)
plt.show()