主页 M

Ryu多控制器通信

2020-12-07 网页编程网 网页编程网

1.server.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import contextlib
import json
from ryu.lib import hub
from ryu.lib.hub import StreamServer

logging.basicConfig()
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

class Server(object):
	def __init__(self, *args):
		super(Server, self).__init__()
		self.clients = {} #client = controller
		self.server = StreamServer(('0.0.0.0', 8888), self._connect)
		self.topo = {}
		

	def _connect(self, socket, address):
		print 'connected address:%s' % str(address)
		
		with contextlib.closing(Client(socket)) as client:
			client.server = self  
			client_id = len(self.clients)+1 

			client.set_id(client_id)
			self.clients[client_id] = client
			client.start()

	def start(self):
		hub.spawn(self.monitor)
		print("Server start...")
		self.server.serve_forever()
		
	def monitor(self):
		while True:
			print self.topo
			hub.sleep(2)

class Client(object):
	def __init__(self, socket):
		super(Client, self).__init__()
		self.send_queue = hub.Queue(32) #controller and server send message
		self.status = True
		self.server = None  # connect to server
		self.socket = socket
		self.client_id = 0

	def set_id(self,client_id):
		self.client_id = client_id
		msg = json.dumps({
			'cmd': 'set_id',
			'client_id': client_id
		})
		self.send(msg)

	def send(self,msg):
		if self.send_queue:
			self.send_queue.put(msg)

	def send_msg(self):
		try:
			while self.status:
				message = self.send_queue.get()
				#message += '\n'
				self.socket.sendall(message)
				hub.sleep(0.1)
		finally:# disconnect
			self.send_queue = None

	def rece_msg(self):
		while self.status:
			try:
				message = self.socket.recv(128)
				if len(message) == 0:
					log.info("connection fail")
					self.status = False
					break
				while '\n' != message[-1]:
					message += self.socket.recv(128)
				data = message.split("\n")
				for temp in data:
					if len(temp) == 0:
						continue
					msg = json.loads(temp)#analyze message
					if msg['cmd'] == 'add_topo':
						dst_dpid = msg['dst_dpid']
						dst_port_no = msg['dst_port_no']
						src_dpid = msg['src_dpid']
						src_port_no = msg['src_port_no']
						if (src_dpid,dst_dpid) not in self.server.topo.keys():
							self.server.topo[(src_dpid,dst_dpid)] = (src_port_no,dst_port_no)
							print "Add topo :",src_dpid,dst_dpid,":",src_port_no,dst_port_no
				hub.sleep(0.1)
			except ValueError:
				print('Value error for %s, len: %d', message, len(message))

	def start(self):
		t1 = hub.spawn(self.send_msg)
		t2 = hub.spawn(self.rece_msg)
		hub.joinall([t1, t2])

	def close(self):
		self.status = False
		self.socket.close()

def main():
	Server().start()

if __name__ == '__main__':
	main()

2.topo.py

#!/usr/bin/python

import sys
sys.path.append("/home/mml/mininet")
import time
import os
from mininet.net import Mininet
from mininet.node import RemoteController,OVSSwitch
from mininet.cli import CLI
from mininet.link import TCLink
from mininet.log import setLogLevel

def topology(remoteip,ofversion):
	
	"***Create a network."
	net = Mininet(controller=RemoteController,switch=OVSSwitch)
	
	print("***Creating hosts")
	h1 = net.addHost("h1",mac="00:00:00:00:00:01",ip="192.168.1.1/16")
	h2 = net.addHost("h2",mac="00:00:00:00:00:02",ip="192.168.1.2/16")
	h3 = net.addHost("h3",mac="00:00:00:00:00:03",ip="192.168.1.3/16")
	h4 = net.addHost("h4",mac="00:00:00:00:00:04",ip="192.168.1.4/16")
	
	print("***Creating switches")
	s1 = net.addSwitch("s1",protocols=ofversion)
	s2 = net.addSwitch("s2",protocols=ofversion)
	s3 = net.addSwitch("s3",protocols=ofversion)
	s4 = net.addSwitch("s4",protocols=ofversion)
	
	c1 = net.addController("c1",controller=RemoteController,ip=remoteip,port=6653)
	c2 = net.addController("c2",controller=RemoteController,ip=remoteip,port=6654)

	print("***Create links")
	#switchLinkOpts = dict(bw=10,delay="1ms")
	#hostLinksOpts = dict(bw=100)
	
	net.addLink(s1, h1, 1)
	net.addLink(s2, h2, 1)
	net.addLink(s3, h3, 1)
	net.addLink(s4, h4, 1)
	net.addLink(s1, s2, 2,2)
	net.addLink(s2, s3, 3,2)
	net.addLink(s3, s4, 3,2)
	

	print("***Building network.")
	net.build()
	s1.start([c1])
	s2.start([c1])
	s3.start([c2])
	s4.start([c2])
	
	print("***Starting network")
	c1.start()
	c2.start()
	CLI(net)
	
	print("***Stoping network")
	net.stop()
	
if __name__ == "__main__":
	setLogLevel("info")
	topology("127.0.0.1","OpenFlow13")

3.c.py

import sys
import socket
import json
sys.path.append("/home/manminglei/ryu")
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import CONFIG_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib.packet import *
from ryu.topology import api
from ryu.lib import hub
from ryu.topology.switches import LLDPPacket

class Controller(app_manager.RyuApp):
	OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

	def __init__(self, *args, **kwargs):
		super(Controller, self).__init__(*args, **kwargs)
		self.topo = {}
		self.status = False
		self.client_id = 1
		self.send_queue = hub.Queue(16)
		self.socket = socket.socket()
		self.start_serve('127.0.0.1', 8888)
		self.time = 0

	def start_serve(self, server_addr, server_port):
		try:
			self.socket.connect((server_addr, server_port))
			self.status = True
			hub.spawn(self._rece_loop)
			hub.spawn(self._send_loop)
		except Exception, e:
			raise e

	def _send_loop(self):
		try:
			while self.status:
				message = self.send_queue.get()
				message += '\n'
				self.socket.sendall(message)
		finally:
			self.send_queue = None

	def _rece_loop(self):
		while self.status:
			try:
				message = self.socket.recv(128)
				if len(message) == 0:
					self.logger.info('connection fail, close')
					self.status = False
					break
				data = message.split("\n")
				for temp in data:
					print temp
					msg = json.loads(temp)
					if msg['cmd'] == 'set_id':
						self.client_id = msg['client_id']

			except ValueError:
				print('Value error for %s, len: %d', message, len(message))

	def send(self, msg):
		if self.send_queue != None:
			self.send_queue.put(msg)

	@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
	def _switch_features_handler(self, ev):
		msg = ev.msg
		datapath = msg.datapath
		ofproto = datapath.ofproto
		parser = datapath.ofproto_parser
		dpid = datapath.id		
		self.logger.info("switch %s connect to controller",datapath.id)
		actions = [parser.OFPActionOutput(port=ofproto.OFPP_CONTROLLER,max_len=ofproto.OFPCML_NO_BUFFER)]
		inst = [parser.OFPInstructionActions(type_=ofproto.OFPIT_APPLY_ACTIONS,actions=actions)]
		mod = parser.OFPFlowMod(datapath=datapath,priority=0,match=parser.OFPMatch(),instructions=inst)
		datapath.send_msg(mod) 

	
	@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
	def packet_in_handler(self, ev):
		msg = ev.msg
		datapath = msg.datapath
		
		try:
			#LLDP handler
			src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
			dst_dpid, dst_port_no = datapath.id, msg.match['in_port']
			self.add_topo(src_dpid,dst_dpid,src_port_no,dst_port_no)
			link = api.get_link(self)
			print self.topo
			for i in link:
				src = i.src
				dst = i.dst
				self.topo[(src.dpid,dst.dpid)] = (src.port_no,dst.port_no)
			
		except LLDPPacket.LLDPUnknownFormat:
			ofproto = datapath.ofproto
			parser = datapath.ofproto_parser
			pkt = packet.Packet(msg.data)
			eth = pkt.get_protocols(ethernet.ethernet)[0]
			mac = eth.src
			dpid,port = datapath.id,msg.match['in_port']
			actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
			data = None
			if msg.buffer_id == ofproto.OFP_NO_BUFFER:
				data = msg.data
			out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,
					in_port=port, actions=actions, data=data)
			datapath.send_msg(out)

	def add_topo(self,src_dpid,dst_dpid,src_port_no,dst_port_no):
		msg = json.dumps({
			'cmd': 'add_topo',
			'src_dpid': src_dpid,
			'dst_dpid': dst_dpid,
			'src_port_no': src_port_no,
			'dst_port_no': dst_port_no
		})
		self.send(msg)

启动方法:

1.运行server文件

./server.py

2.运行两个控制器文件,第一个控制器默认监听端口6653

ryu-manager --observe-links c.py

ryu-manager --observe-links --ofp-tcp-listen-port 6654 c.py

3.运行拓扑文件

./topo.py

阅读原文
阅读 4399
123 显示电脑版