您的位置:首页 > 其它

POX控制器下bellmanford算法的实现

2015-11-24 15:29 561 查看
bellmanford的实现如下:

[Goal]

Based on l2_multi.py pox controller module, I write l2_bellmanford.py which use bellman-ford algorithm to find a shortest path. This version is more stable than the module
in opennetmon.

[Evaluation Environment]

We will build up a topology like the following figure. Then use l2_bellmanford to find a shortest path between hosts.

[img=http://img.bbs.csdn.net/upload/201511/24/1448349378_787849.gif][/img]

[Steps]

1. Create the evaluation topology.

[img=http://img.bbs.csdn.net/upload/201511/24/1448349419_852502.jpg][/img]

2. Prepare the l2_bellmanford.py and save this file under /pox/ext folder.

from pox.core import core

import pox.openflow.libopenflow_01 as of

from pox.lib.revent import *

from pox.lib.recoco import Timer

from collections import defaultdict

from pox.openflow.discovery import Discovery

from pox.lib.util import dpid_to_str

import time

log = core.getLogger()

# Adjacency map.  [sw1][sw2] -> port from sw1 to sw2

adjacency = defaultdict(lambda:defaultdict(lambda:None))

# Switches we know of.  [dpid] -> Switch

switches = {}

# ethaddr -> (switch, port)

mac_map = {}

# Waiting path.  (dpid,xid)->WaitingPath

waiting_paths = {}

# Time to not flood in seconds

FLOOD_HOLDDOWN = 5

# Flow timeouts

FLOW_IDLE_TIMEOUT = 10

FLOW_HARD_TIMEOUT = 30

# How long is allowable to set up a path?

PATH_SETUP_TIME = 4

def _get_raw_path (src,dst):

#Bellman-Ford algorithm

#print "src=",src," dst=",dst

distance = {}

previous = {}

sws = switches.values()

for dpid in sws:

distance[dpid] = 9999

previous[dpid] = None

distance[src]=0

for m in range(len(sws)-1):

for p in sws:

for q in sws:

if adjacency[p][q]!=None:

w = 1

if distance[p] + w < distance[q]:

distance[q] = distance[p] + w

previous[q] = p

r=[]

p=dst

r.append(p)

q=previous[p]

while q is not None:

if q == src:

r.append(q)

break

p=q

r.append(p)

q=previous[p]

r.reverse()

return r

def _check_path (p):

"""

Make sure that a path is actually a string of nodes with connected ports

returns True if path is valid

"""

for a,b in zip(p[:-1],p[1:]):

if adjacency[a[0]][b[0]] != a[2]:

return False

if adjacency[b[0]][a[0]] != b[1]:

return False

return True

def _get_path (src, dst, first_port, final_port):

"""

Gets a cooked path -- a list of (node,in_port,out_port)

"""

# Start with a raw path...

if src == dst:

path = [src]

else:

path = _get_raw_path(src, dst)

if path is None: return None

print "src=",src," dst=",dst

print time.time(),": ",path

# Now add the ports

r = []

in_port = first_port

for s1,s2 in zip(path[:-1],path[1:]):

out_port = adjacency[s1][s2]

r.append((s1,in_port,out_port))

in_port = adjacency[s2][s1]

r.append((dst,in_port,final_port))

assert _check_path(r), "Illegal path!"

return r

class WaitingPath (object):

"""

A path which is waiting for its path to be established

"""

def __init__ (self, path, packet):

"""

xids is a sequence of (dpid,xid)

first_switch is the DPID where the packet came from

packet is something that can be sent in a packet_out

"""

self.expires_at = time.time() + PATH_SETUP_TIME

self.path = path

self.first_switch = path[0][0].dpid

self.xids = set()

self.packet = packet

if len(waiting_paths) > 1000:

WaitingPath.expire_waiting_paths()

def add_xid (self, dpid, xid):

self.xids.add((dpid,xid))

waiting_paths[(dpid,xid)] = self

@property

def is_expired (self):

return time.time() >= self.expires_at

def notify (self, event):

"""

Called when a barrier has been received

"""

self.xids.discard((event.dpid,event.xid))

if len(self.xids) == 0:

# Done!

if self.packet:

log.debug("Sending delayed packet out %s"

% (dpid_to_str(self.first_switch),))

msg = of.ofp_packet_out(data=self.packet,

action=of.ofp_action_output(port=of.OFPP_TABLE))

core.openflow.sendToDPID(self.first_switch, msg)

core.l2_multi.raiseEvent(PathInstalled(self.path))

@staticmethod

def expire_waiting_paths ():

packets = set(waiting_paths.values())

killed = 0

for p in packets:

if p.is_expired:

killed += 1

for entry in p.xids:

waiting_paths.pop(entry, None)

if killed:

log.error("%i paths failed to install" % (killed,))

class PathInstalled (Event):

"""

Fired when a path is installed

"""

def __init__ (self, path):

Event.__init__(self)

self.path = path

class Switch (EventMixin):

def __init__ (self):

self.connection = None

self.ports = None

self.dpid = None

self._listeners = None

self._connected_at = None

def __repr__ (self):

return dpid_to_str(self.dpid)

def _install (self, switch, in_port, out_port, match, buf = None):

msg = of.ofp_flow_mod()

msg.match = match

msg.match.in_port = in_port

msg.idle_timeout = FLOW_IDLE_TIMEOUT

msg.hard_timeout = FLOW_HARD_TIMEOUT

msg.actions.append(of.ofp_action_output(port = out_port))

msg.buffer_id = buf

switch.connection.send(msg)

def _install_path (self, p, match, packet_in=None):

wp = WaitingPath(p, packet_in)

for sw,in_port,out_port in p:

self._install(sw, in_port, out_port, match)

msg = of.ofp_barrier_request()

sw.connection.send(msg)

wp.add_xid(sw.dpid,msg.xid)

def install_path (self, dst_sw, last_port, match, event):

"""

Attempts to install a path between this switch and some destination

"""

p = _get_path(self, dst_sw, event.port, last_port)

if p is None:

log.warning("Can't get from %s to %s", match.dl_src, match.dl_dst)

import pox.lib.packet as pkt

if (match.dl_type == pkt.ethernet.IP_TYPE and

event.parsed.find('ipv4')):

# It's IP -- let's send a destination unreachable

log.debug("Dest unreachable (%s -> %s)",

match.dl_src, match.dl_dst)

from pox.lib.addresses import EthAddr

e = pkt.ethernet()

e.src = EthAddr(dpid_to_str(self.dpid)) #FIXME: Hmm...

e.dst = match.dl_src

e.type = e.IP_TYPE

ipp = pkt.ipv4()

ipp.protocol = ipp.ICMP_PROTOCOL

ipp.srcip = match.nw_dst #FIXME: Ridiculous

ipp.dstip = match.nw_src

icmp = pkt.icmp()

icmp.type = pkt.ICMP.TYPE_DEST_UNREACH

icmp.code = pkt.ICMP.CODE_UNREACH_HOST

orig_ip = event.parsed.find('ipv4')

d = orig_ip.pack()

d = d[:orig_ip.hl * 4 + 8]

import struct

d = struct.pack("!HH", 0,0) + d #FIXME: MTU

icmp.payload = d

ipp.payload = icmp

e.payload = ipp

msg = of.ofp_packet_out()

msg.actions.append(of.ofp_action_output(port = event.port))

msg.data = e.pack()

self.connection.send(msg)

return

log.debug("Installing path for %s -> %s %04x (%i hops)",

match.dl_src, match.dl_dst, match.dl_type, len(p))

# We have a path -- install it

self._install_path(p, match, event.ofp)

# Now reverse it and install it backwards

# (we'll just assume that will work)

p = [(sw,out_port,in_port) for sw,in_port,out_port in p]

self._install_path(p, match.flip())

def _handle_PacketIn (self, event):

def flood ():

""" Floods the packet """

if self.is_holding_down:

log.warning("Not flooding -- holddown active")

msg = of.ofp_packet_out()

# OFPP_FLOOD is optional; some switches may need OFPP_ALL

msg.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))

msg.buffer_id = event.ofp.buffer_id

msg.in_port = event.port

self.connection.send(msg)

def drop ():

# Kill the buffer

if event.ofp.buffer_id is not None:

msg = of.ofp_packet_out()

msg.buffer_id = event.ofp.buffer_id

event.ofp.buffer_id = None # Mark is dead

msg.in_port = event.port

self.connection.send(msg)

packet = event.parsed

loc = (self, event.port) # Place we saw this ethaddr

oldloc = mac_map.get(packet.src) # Place we last saw this ethaddr

if packet.effective_ethertype == packet.LLDP_TYPE:

drop()

return

if oldloc is None:

if packet.src.is_multicast == False:

mac_map[packet.src] = loc # Learn position for ethaddr

log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

elif oldloc != loc:

# ethaddr seen at different place!

if core.openflow_discovery.is_edge_port(loc[0].dpid, loc[1]):

# New place is another "plain" port (probably)

log.debug("%s moved from %s.%i to %s.%i?", packet.src,

dpid_to_str(oldloc[0].dpid), oldloc[1],

dpid_to_str(   loc[0].dpid),    loc[1])

if packet.src.is_multicast == False:

mac_map[packet.src] = loc # Learn position for ethaddr

log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

elif packet.dst.is_multicast == False:

# New place is a switch-to-switch port!

# Hopefully, this is a packet we're flooding because we didn't

# know the destination, and not because it's somehow not on a

# path that we expect it to be on.

# If spanning_tree is running, we might check that this port is

# on the spanning tree (it should be).

if packet.dst in mac_map:

# Unfortunately, we know the destination.  It's possible that

# we learned it while it was in flight, but it's also possible

# that something has gone wrong.

log.warning("Packet from %s to known destination %s arrived "

"at %s.%i without flow", packet.src, packet.dst,

dpid_to_str(self.dpid), event.port)

if packet.dst.is_multicast:

log.debug("Flood multicast from %s", packet.src)

flood()

else:

if packet.dst not in mac_map:

log.debug("%s unknown -- flooding" % (packet.dst,))

flood()

else:

dest = mac_map[packet.dst]

match = of.ofp_match.from_packet(packet)

self.install_path(dest[0], dest[1], match, event)

def disconnect (self):

if self.connection is not None:

log.debug("Disconnect %s" % (self.connection,))

self.connection.removeListeners(self._listeners)

self.connection = None

self._listeners = None

def connect (self, connection):

if self.dpid is None:

self.dpid = connection.dpid

assert self.dpid == connection.dpid

if self.ports is None:

self.ports = connection.features.ports

self.disconnect()

log.debug("Connect %s" % (connection,))

self.connection = connection

self._listeners = self.listenTo(connection)

self._connected_at = time.time()

@property

def is_holding_down (self):

if self._connected_at is None: return True

if time.time() - self._connected_at > FLOOD_HOLDDOWN:

return False

return True

def _handle_ConnectionDown (self, event):

self.disconnect()

class l2_multi (EventMixin):

_eventMixin_events = set([

PathInstalled,

])

def __init__ (self):

# Listen to dependencies

def startup ():

core.openflow.addListeners(self, priority=0)

core.openflow_discovery.addListeners(self)

core.call_when_ready(startup, ('openflow','openflow_discovery'))

def _handle_LinkEvent (self, event):

def flip (link):

return Discovery.Link(link[2],link[3], link[0],link[1])

l = event.link

sw1 = switches[l.dpid1]

sw2 = switches[l.dpid2]

# Invalidate all flows and path info.

# For link adds, this makes sure that if a new link leads to an

# improved path, we use it.

# For link removals, this makes sure that we don't use a

# path that may have been broken.

#NOTE: This could be radically improved! (e.g., not *ALL* paths break)

clear = of.ofp_flow_mod(command=of.OFPFC_DELETE)

for sw in switches.itervalues():

if sw.connection is None: continue

sw.connection.send(clear)

if event.removed:

# This link no longer okay

if sw2 in adjacency[sw1]: del adjacency[sw1][sw2]

if sw1 in adjacency[sw2]: del adjacency[sw2][sw1]

# But maybe there's another way to connect these...

for ll in core.openflow_discovery.adjacency:

if ll.dpid1 == l.dpid1 and ll.dpid2 == l.dpid2:

if flip(ll) in core.openflow_discovery.adjacency:

# Yup, link goes both ways

adjacency[sw1][sw2] = ll.port1

adjacency[sw2][sw1] = ll.port2

# Fixed -- new link chosen to connect these

break

else:

# If we already consider these nodes connected, we can

# ignore this link up.

# Otherwise, we might be interested...

if adjacency[sw1][sw2] is None:

# These previously weren't connected.  If the link

# exists in both directions, we consider them connected now.

if flip(l) in core.openflow_discovery.adjacency:

# Yup, link goes both ways -- connected!

adjacency[sw1][sw2] = l.port1

adjacency[sw2][sw1] = l.port2

# If we have learned a MAC on this port which we now know to

# be connected to a switch, unlearn it.

bad_macs = set()

for mac,(sw,port) in mac_map.iteritems():

if sw is sw1 and port == l.port1: bad_macs.add(mac)

if sw is sw2 and port == l.port2: bad_macs.add(mac)

for mac in bad_macs:

log.debug("Unlearned %s", mac)

del mac_map[mac]

def _handle_ConnectionUp (self, event):

sw = switches.get(event.dpid)

if sw is None:

# New switch

sw = Switch()

switches[event.dpid] = sw

sw.connect(event.connection)

else:

sw.connect(event.connection)

def _handle_BarrierIn (self, event):

wp = waiting_paths.pop((event.dpid,event.xid), None)

if not wp:

#log.info("No waiting packet %s,%s", event.dpid, event.xid)

return

#log.debug("Notify waiting packet %s,%s", event.dpid, event.xid)

wp.notify(event)

def launch ():

core.registerNew(l2_multi)

timeout = min(max(PATH_SETUP_TIME, 5) * 2, 15)

Timer(timeout, WaitingPath.expire_waiting_paths, recurring=True)


3. Open another terminal, kill the default controller, and run the l2_bellmanford module of pox controller.

[img=http://img.bbs.csdn.net/upload/201511/24/1448349543_196612.jpg][/img]

4. Ping tests from h1 to h3. (We can see that the paths are built from 00-00-00-00-00-03 (S3)->00-00-00-00-00-02(S2)-> 00-00-00-00-00-04(S4) or 00-00-00-00-00-04 (S4)->00-00-00-00-00-02(S2)-> 00-00-00-00-00-03(S3) for IP and ARP packets transmission.

[img=http://img.bbs.csdn.net/upload/201511/24/1448349636_850090.jpg][/img]

5. Ping tests from h1 to h8. The paths are built from s3-s2-s1-s5-s7 or vice versa.

[img=http://img.bbs.csdn.net/upload/201511/24/1448349661_644776.jpg][/img]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: