1.server.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import logging import contextlib import json from ryu.lib import hub from ryu.lib.hub import StreamServer logging.basicConfig() log = logging.getLogger(__name__) log.setLevel(logging.INFO) class Server(object): def __init__(self, *args): super(Server, self).__init__() self.clients = {} #client = controller self.server = StreamServer(('0.0.0.0', 8888), self._connect) self.topo = {} def _connect(self, socket, address): print 'connected address:%s' % str(address) with contextlib.closing(Client(socket)) as client: client.server = self client_id = len(self.clients)+1 client.set_id(client_id) self.clients[client_id] = client client.start() def start(self): hub.spawn(self.monitor) print("Server start...") self.server.serve_forever() def monitor(self): while True: print self.topo hub.sleep(2) class Client(object): def __init__(self, socket): super(Client, self).__init__() self.send_queue = hub.Queue(32) #controller and server send message self.status = True self.server = None # connect to server self.socket = socket self.client_id = 0 def set_id(self,client_id): self.client_id = client_id msg = json.dumps({ 'cmd': 'set_id', 'client_id': client_id }) self.send(msg) def send(self,msg): if self.send_queue: self.send_queue.put(msg) def send_msg(self): try: while self.status: message = self.send_queue.get() #message += '\n' self.socket.sendall(message) hub.sleep(0.1) finally:# disconnect self.send_queue = None def rece_msg(self): while self.status: try: message = self.socket.recv(128) if len(message) == 0: log.info("connection fail") self.status = False break while '\n' != message[-1]: message += self.socket.recv(128) data = message.split("\n") for temp in data: if len(temp) == 0: continue msg = json.loads(temp)#analyze message if msg['cmd'] == 'add_topo': dst_dpid = msg['dst_dpid'] dst_port_no = msg['dst_port_no'] src_dpid = msg['src_dpid'] src_port_no = msg['src_port_no'] if (src_dpid,dst_dpid) not in self.server.topo.keys(): self.server.topo[(src_dpid,dst_dpid)] = (src_port_no,dst_port_no) print "Add topo :",src_dpid,dst_dpid,":",src_port_no,dst_port_no hub.sleep(0.1) except ValueError: print('Value error for %s, len: %d', message, len(message)) def start(self): t1 = hub.spawn(self.send_msg) t2 = hub.spawn(self.rece_msg) hub.joinall([t1, t2]) def close(self): self.status = False self.socket.close() def main(): Server().start() if __name__ == '__main__': main()
2.topo.py
#!/usr/bin/python import sys sys.path.append("/home/mml/mininet") import time import os from mininet.net import Mininet from mininet.node import RemoteController,OVSSwitch from mininet.cli import CLI from mininet.link import TCLink from mininet.log import setLogLevel def topology(remoteip,ofversion): "***Create a network." net = Mininet(controller=RemoteController,switch=OVSSwitch) print("***Creating hosts") h1 = net.addHost("h1",mac="00:00:00:00:00:01",ip="192.168.1.1/16") h2 = net.addHost("h2",mac="00:00:00:00:00:02",ip="192.168.1.2/16") h3 = net.addHost("h3",mac="00:00:00:00:00:03",ip="192.168.1.3/16") h4 = net.addHost("h4",mac="00:00:00:00:00:04",ip="192.168.1.4/16") print("***Creating switches") s1 = net.addSwitch("s1",protocols=ofversion) s2 = net.addSwitch("s2",protocols=ofversion) s3 = net.addSwitch("s3",protocols=ofversion) s4 = net.addSwitch("s4",protocols=ofversion) c1 = net.addController("c1",controller=RemoteController,ip=remoteip,port=6653) c2 = net.addController("c2",controller=RemoteController,ip=remoteip,port=6654) print("***Create links") #switchLinkOpts = dict(bw=10,delay="1ms") #hostLinksOpts = dict(bw=100) net.addLink(s1, h1, 1) net.addLink(s2, h2, 1) net.addLink(s3, h3, 1) net.addLink(s4, h4, 1) net.addLink(s1, s2, 2,2) net.addLink(s2, s3, 3,2) net.addLink(s3, s4, 3,2) print("***Building network.") net.build() s1.start([c1]) s2.start([c1]) s3.start([c2]) s4.start([c2]) print("***Starting network") c1.start() c2.start() CLI(net) print("***Stoping network") net.stop() if __name__ == "__main__": setLogLevel("info") topology("127.0.0.1","OpenFlow13")
3.c.py
import sys import socket import json sys.path.append("/home/manminglei/ryu") from ryu.base import app_manager from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.controller.handler import CONFIG_DISPATCHER from ryu.controller.handler import set_ev_cls from ryu.ofproto import ofproto_v1_3 from ryu.lib.packet import * from ryu.topology import api from ryu.lib import hub from ryu.topology.switches import LLDPPacket class Controller(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self, *args, **kwargs): super(Controller, self).__init__(*args, **kwargs) self.topo = {} self.status = False self.client_id = 1 self.send_queue = hub.Queue(16) self.socket = socket.socket() self.start_serve('127.0.0.1', 8888) self.time = 0 def start_serve(self, server_addr, server_port): try: self.socket.connect((server_addr, server_port)) self.status = True hub.spawn(self._rece_loop) hub.spawn(self._send_loop) except Exception, e: raise e def _send_loop(self): try: while self.status: message = self.send_queue.get() message += '\n' self.socket.sendall(message) finally: self.send_queue = None def _rece_loop(self): while self.status: try: message = self.socket.recv(128) if len(message) == 0: self.logger.info('connection fail, close') self.status = False break data = message.split("\n") for temp in data: print temp msg = json.loads(temp) if msg['cmd'] == 'set_id': self.client_id = msg['client_id'] except ValueError: print('Value error for %s, len: %d', message, len(message)) def send(self, msg): if self.send_queue != None: self.send_queue.put(msg) @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER) def _switch_features_handler(self, ev): msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto parser = datapath.ofproto_parser dpid = datapath.id self.logger.info("switch %s connect to controller",datapath.id) actions = [parser.OFPActionOutput(port=ofproto.OFPP_CONTROLLER,max_len=ofproto.OFPCML_NO_BUFFER)] inst = [parser.OFPInstructionActions(type_=ofproto.OFPIT_APPLY_ACTIONS,actions=actions)] mod = parser.OFPFlowMod(datapath=datapath,priority=0,match=parser.OFPMatch(),instructions=inst) datapath.send_msg(mod) @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) def packet_in_handler(self, ev): msg = ev.msg datapath = msg.datapath try: #LLDP handler src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data) dst_dpid, dst_port_no = datapath.id, msg.match['in_port'] self.add_topo(src_dpid,dst_dpid,src_port_no,dst_port_no) link = api.get_link(self) print self.topo for i in link: src = i.src dst = i.dst self.topo[(src.dpid,dst.dpid)] = (src.port_no,dst.port_no) except LLDPPacket.LLDPUnknownFormat: ofproto = datapath.ofproto parser = datapath.ofproto_parser pkt = packet.Packet(msg.data) eth = pkt.get_protocols(ethernet.ethernet)[0] mac = eth.src dpid,port = datapath.id,msg.match['in_port'] actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)] data = None if msg.buffer_id == ofproto.OFP_NO_BUFFER: data = msg.data out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id, in_port=port, actions=actions, data=data) datapath.send_msg(out) def add_topo(self,src_dpid,dst_dpid,src_port_no,dst_port_no): msg = json.dumps({ 'cmd': 'add_topo', 'src_dpid': src_dpid, 'dst_dpid': dst_dpid, 'src_port_no': src_port_no, 'dst_port_no': dst_port_no }) self.send(msg)
启动方法:
1.运行server文件
./server.py
2.运行两个控制器文件,第一个控制器默认监听端口6653
ryu-manager --observe-links c.py
ryu-manager --observe-links --ofp-tcp-listen-port 6654 c.py
3.运行拓扑文件
./topo.py