1.server.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import contextlib
import json
from ryu.lib import hub
from ryu.lib.hub import StreamServer
logging.basicConfig()
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
class Server(object):
def __init__(self, *args):
super(Server, self).__init__()
self.clients = {} #client = controller
self.server = StreamServer(('0.0.0.0', 8888), self._connect)
self.topo = {}
def _connect(self, socket, address):
print 'connected address:%s' % str(address)
with contextlib.closing(Client(socket)) as client:
client.server = self
client_id = len(self.clients)+1
client.set_id(client_id)
self.clients[client_id] = client
client.start()
def start(self):
hub.spawn(self.monitor)
print("Server start...")
self.server.serve_forever()
def monitor(self):
while True:
print self.topo
hub.sleep(2)
class Client(object):
def __init__(self, socket):
super(Client, self).__init__()
self.send_queue = hub.Queue(32) #controller and server send message
self.status = True
self.server = None # connect to server
self.socket = socket
self.client_id = 0
def set_id(self,client_id):
self.client_id = client_id
msg = json.dumps({
'cmd': 'set_id',
'client_id': client_id
})
self.send(msg)
def send(self,msg):
if self.send_queue:
self.send_queue.put(msg)
def send_msg(self):
try:
while self.status:
message = self.send_queue.get()
#message += '\n'
self.socket.sendall(message)
hub.sleep(0.1)
finally:# disconnect
self.send_queue = None
def rece_msg(self):
while self.status:
try:
message = self.socket.recv(128)
if len(message) == 0:
log.info("connection fail")
self.status = False
break
while '\n' != message[-1]:
message += self.socket.recv(128)
data = message.split("\n")
for temp in data:
if len(temp) == 0:
continue
msg = json.loads(temp)#analyze message
if msg['cmd'] == 'add_topo':
dst_dpid = msg['dst_dpid']
dst_port_no = msg['dst_port_no']
src_dpid = msg['src_dpid']
src_port_no = msg['src_port_no']
if (src_dpid,dst_dpid) not in self.server.topo.keys():
self.server.topo[(src_dpid,dst_dpid)] = (src_port_no,dst_port_no)
print "Add topo :",src_dpid,dst_dpid,":",src_port_no,dst_port_no
hub.sleep(0.1)
except ValueError:
print('Value error for %s, len: %d', message, len(message))
def start(self):
t1 = hub.spawn(self.send_msg)
t2 = hub.spawn(self.rece_msg)
hub.joinall([t1, t2])
def close(self):
self.status = False
self.socket.close()
def main():
Server().start()
if __name__ == '__main__':
main()
2.topo.py
#!/usr/bin/python
import sys
sys.path.append("/home/mml/mininet")
import time
import os
from mininet.net import Mininet
from mininet.node import RemoteController,OVSSwitch
from mininet.cli import CLI
from mininet.link import TCLink
from mininet.log import setLogLevel
def topology(remoteip,ofversion):
"***Create a network."
net = Mininet(controller=RemoteController,switch=OVSSwitch)
print("***Creating hosts")
h1 = net.addHost("h1",mac="00:00:00:00:00:01",ip="192.168.1.1/16")
h2 = net.addHost("h2",mac="00:00:00:00:00:02",ip="192.168.1.2/16")
h3 = net.addHost("h3",mac="00:00:00:00:00:03",ip="192.168.1.3/16")
h4 = net.addHost("h4",mac="00:00:00:00:00:04",ip="192.168.1.4/16")
print("***Creating switches")
s1 = net.addSwitch("s1",protocols=ofversion)
s2 = net.addSwitch("s2",protocols=ofversion)
s3 = net.addSwitch("s3",protocols=ofversion)
s4 = net.addSwitch("s4",protocols=ofversion)
c1 = net.addController("c1",controller=RemoteController,ip=remoteip,port=6653)
c2 = net.addController("c2",controller=RemoteController,ip=remoteip,port=6654)
print("***Create links")
#switchLinkOpts = dict(bw=10,delay="1ms")
#hostLinksOpts = dict(bw=100)
net.addLink(s1, h1, 1)
net.addLink(s2, h2, 1)
net.addLink(s3, h3, 1)
net.addLink(s4, h4, 1)
net.addLink(s1, s2, 2,2)
net.addLink(s2, s3, 3,2)
net.addLink(s3, s4, 3,2)
print("***Building network.")
net.build()
s1.start([c1])
s2.start([c1])
s3.start([c2])
s4.start([c2])
print("***Starting network")
c1.start()
c2.start()
CLI(net)
print("***Stoping network")
net.stop()
if __name__ == "__main__":
setLogLevel("info")
topology("127.0.0.1","OpenFlow13")
3.c.py
import sys
import socket
import json
sys.path.append("/home/manminglei/ryu")
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import CONFIG_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib.packet import *
from ryu.topology import api
from ryu.lib import hub
from ryu.topology.switches import LLDPPacket
class Controller(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
def __init__(self, *args, **kwargs):
super(Controller, self).__init__(*args, **kwargs)
self.topo = {}
self.status = False
self.client_id = 1
self.send_queue = hub.Queue(16)
self.socket = socket.socket()
self.start_serve('127.0.0.1', 8888)
self.time = 0
def start_serve(self, server_addr, server_port):
try:
self.socket.connect((server_addr, server_port))
self.status = True
hub.spawn(self._rece_loop)
hub.spawn(self._send_loop)
except Exception, e:
raise e
def _send_loop(self):
try:
while self.status:
message = self.send_queue.get()
message += '\n'
self.socket.sendall(message)
finally:
self.send_queue = None
def _rece_loop(self):
while self.status:
try:
message = self.socket.recv(128)
if len(message) == 0:
self.logger.info('connection fail, close')
self.status = False
break
data = message.split("\n")
for temp in data:
print temp
msg = json.loads(temp)
if msg['cmd'] == 'set_id':
self.client_id = msg['client_id']
except ValueError:
print('Value error for %s, len: %d', message, len(message))
def send(self, msg):
if self.send_queue != None:
self.send_queue.put(msg)
@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
def _switch_features_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
dpid = datapath.id
self.logger.info("switch %s connect to controller",datapath.id)
actions = [parser.OFPActionOutput(port=ofproto.OFPP_CONTROLLER,max_len=ofproto.OFPCML_NO_BUFFER)]
inst = [parser.OFPInstructionActions(type_=ofproto.OFPIT_APPLY_ACTIONS,actions=actions)]
mod = parser.OFPFlowMod(datapath=datapath,priority=0,match=parser.OFPMatch(),instructions=inst)
datapath.send_msg(mod)
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
def packet_in_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
try:
#LLDP handler
src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
dst_dpid, dst_port_no = datapath.id, msg.match['in_port']
self.add_topo(src_dpid,dst_dpid,src_port_no,dst_port_no)
link = api.get_link(self)
print self.topo
for i in link:
src = i.src
dst = i.dst
self.topo[(src.dpid,dst.dpid)] = (src.port_no,dst.port_no)
except LLDPPacket.LLDPUnknownFormat:
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
pkt = packet.Packet(msg.data)
eth = pkt.get_protocols(ethernet.ethernet)[0]
mac = eth.src
dpid,port = datapath.id,msg.match['in_port']
actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
data = None
if msg.buffer_id == ofproto.OFP_NO_BUFFER:
data = msg.data
out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,
in_port=port, actions=actions, data=data)
datapath.send_msg(out)
def add_topo(self,src_dpid,dst_dpid,src_port_no,dst_port_no):
msg = json.dumps({
'cmd': 'add_topo',
'src_dpid': src_dpid,
'dst_dpid': dst_dpid,
'src_port_no': src_port_no,
'dst_port_no': dst_port_no
})
self.send(msg)
1.运行server文件
./server.py
2.运行两个控制器文件,第一个控制器默认监听端口6653
ryu-manager --observe-links c.py
ryu-manager --observe-links --ofp-tcp-listen-port 6654 c.py
3.运行拓扑文件
./topo.py