MyException - 我的异常网
当前位置:我的异常网» 软件架构设计 » 从零开始构建自己的RPC框架

从零开始构建自己的RPC框架

www.MyException.Cn  网友分享于:2013-09-28  浏览:0次
从0开始构建自己的RPC框架

1.介绍

现在都喜欢用什么高大上的东西,弄出一堆框架来。RPC即为远程过程调用协议,让两个终端之间不需再关注网络传输的实现。

这里以实现简单聊天室为目的,一步一步搭建属于自己的RPC架构。这里起名为everyw,意为eyerywhere,在任何地方都可以使用。

服务器,提供服务者;客户端,使用服务者。

 

2.服务端功能

聊天服务器最基本的服务就是注册用户,发送聊天信息,这里新建一个表示服务器功能的接口

 

/**
 * 服务器功能
 */
public interface Api
{
	/**
	 * 注册用户
	 * @param name
	 * @return
	 */
	Message<Void> regist(String name);

	/**
	 * 发送聊天信息
	 * @param message
	 */
	Message<Void> sendMessage(String message);
}

 为了显现服务器网络连通等出现的异常,这里提供了一个类似Optional的类Message

 

 

/**
 * 消息体封装类
 */
public class Message<T> implements java.io.Serializable
{ 
	private T value;
	private String error;
	private boolean hashError;
	public	Message( )
	{  
	}
	public	Message(T obj)
	{
		this.value=obj;
	}
	public	Message(T obj,String error)
	{
		this.value=obj;
		this.error=error;
		hashError=true;
	}
	
	/**
	 * 是否有异常
	 * 
	 * @return
	 */
	public boolean hasError()
	{
		return hashError;
	}

	/**
	 * 返回默认信息
	 * 
	 * @return
	 */
	public String getError()
	{
		return error;
	}
	/**
	 * 有网络异常时调用
	 * @param action
	 * @return
	 */
	public Message<T> error(Consumer<String> action)
	{
		if(hashError)
		{
			action.accept(error);
		} 
		return this;
	}
	/**
	 * 成功返回时调用
	 * @param action
	 * @return
	 */
	public Message<T> success(Consumer<Void> action)
	{
		if(error==null)
		{
			action.accept(null);
		} 
		return this;
	}
	/**
	 * 有返回值时返回
	 * @param action
	 * @return
	 */
	public Message<T> value(Consumer<T> action)
	{
		if(value!=null)
		{
			action.accept(value);
		}
		return this;
	}
	public T getValue()
	{
		return value;
	}
	/**
	 * @param error
	 */
	public void setError(String error)
	{
		this.error=error;
		hashError=true;
	}

	 
}

为了让Message可以传输,添加了java.io.Serializable接口。

 

 

3.客户端能力

客户端有时需要向服务器展现自己的能力,方便服务器调用。一个简单的聊天室就具有接收消息,处理消息的能力。这里新建一个表示客户端能力的接口

 

/**
 * 客户端能力
 */
public interface Listener
{
	/**
	 * 接收到消息
	 */
	void onMessage(String msg);
}

 

 

5.客户端实现

作为一个客户端,除了需要知道服务器的地址,就不需要关注其具体通信了。

 

public static void main(String[] args)
	{
		String url = "127.0.0.1:8080";
		ApplicationContext context = ApplicationContext.getContext(url);
		Api api = context.getService(Api.class);
		Listener listener = message -> {
			System.out.println("消息:" + message);
		};
		context.registListener(listener);
		try (Scanner br = new Scanner(System.in))
		{
			System.out.println("请输入你的名字:");
			String line = br.nextLine();
			api.regist(line).error(e -> {
				System.err.println("注册失败:" + e);
				System.exit(1);
			});
			while (true)
			{
				line = br.nextLine();
				api.sendMessage(line);
			}
		}
	}

 代码量很少很简洁,也无任何网络通信的代码。

 

ApplicationContext 将是重点要实现的内容,这里通过getContext获取单例应用上下文,getService获取具体服务实现,registListener则是向服务展现客户端能力。

 

6.服务端实现

首先需要服务的实现者,任何类只需要实现服务Api接口即可。

/**
	 * 服务实现者
	 */
	public static class ServerWoker implements Api
	{
		private ClientContext client;
		private   String name;

		/**
		 * 
		 * @param client
		 *            当前客户端上下文
		 */
		public ServerWoker(ClientContext client)
		{
			this.client = client;
		}

		@Override
		public Message<Void> regist(String name)
		{
			Logger.d(client +"注册用户名:"+name);
			this.name = name;
			List<ClientContext> clients = client.getServerContext().getClientContexts();
			if (clients != null)
			{
				for (ClientContext c : clients)
				{
					if (name.equals(c.getSession(true).get("name")))
					{
						return new Message<Void>(null, "用户名已存在");
					}
				}
			} 
			// 储存用户名
			client.getSession(true).put("name", name);
			return new Message<Void>();
		}

		@Override
		public Message<Void> sendMessage(String message)
		{
			Logger.d("客户端["+client +"]发送消息:"+name+":"+message);
			List<ClientContext> clients = client.getServerContext().getClientContexts();
			if (clients != null)
			{
				clients.forEach(c -> {
					Listener listener = c.getListener(Listener.class);
					if (listener != null)
					{
						listener.onMessage(name + ":" + message);
					}
				});
			}
			return new Message<Void>();
		}

	}

 业务逻辑也非常的清晰明了。

服务端启动代码也非常简洁容易明白。

public static void main(String[] args)
	{
		//创建服务上下文
		ServerContext context = ServerContext.create(8080);
		//为每个客户端绑定能力
		context.bindService(client -> {
			return new ServerWoker(client);
		});
		context.start();// 启动服务
	}

 

这里ServerContext为服务器上下文,包括了创建、启动服务,绑定能力,获取所有客户端上下文等功能。

ClientContext客户端上下文,可以获取使用获取客户端能力,保存使用客户信息等功能。

 

7.项目组织结构

在实现具体代码之前,先看一下项目结构,如下



 依赖关系图,

 
 

8.客户端架构具体实现过程

ApplicationContext首先需要通过socket连接,和创建一个读取线程

 

private ApplicationContext(String url)
	{

		int i = url.lastIndexOf(":");
		if (i == -1)
		{
			this.url = url;
			this.port = 80;
		} else
		{
			this.url = url.substring(0, i);
			this.port = Integer.parseInt(url.substring(i + 1));
		}
		isStart = false;
		listeners=new ArrayList<>(); 
		map=new HashMap<>();
	}
/**
	 * 启动连接 
	 * @throws IOException @throws
	 */
	private synchronized void start()
	{
		try
		{
			socket = new Socket(url, port);
			writer = new DataOutputStream(socket.getOutputStream());
			reader = new DataInputStream(socket.getInputStream());
			new ReadWorker().start();
			isStart = true;
		} catch (IOException e)
		{
			e.printStackTrace();
		}
	}
 

 

ApplicationContext#getService 传入的是Class对象,并不是具体实现对象。Proxy 提供用于创建动态代理类和实例的静态方法,它还是由这些方法创建的所有动态代理类的超类,通过代理反射的方式可以将抽象接口在运行时转变为具体实现类。

 

	/**
	 * 获取api服务
	 * 
	 * @param clz
	 * @param url
	 * @return
	 */
	public <T> T getService(Class<T> clz)
	{
		if (!isStart)
		{
			start();
		}
		InvocationHandler handler = new InvocationHandler()
		{

			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Exception
			{
				try
				{
					// 向服务端发送数据
					long bid=System.currentTimeMillis();//消息块id
					StringBuffer head = new StringBuffer("clz=" + clz.getName() + ";mth="
							+ method.getName()+";type="+method.getGenericReturnType() + ";length=" + args.length + ";bid="+bid+";");
					Logger.d(head);
					if (args.length != 0)
					{
						head.append(Object2String(args));
					}
					writer.writeUTF(head.toString());
					writer.flush();
					Logger.d("发送完消息");
					//等待来接服务器的消息
					String data =getAndwait(bid);
					Logger.d("接收消息");
					return String2Object(data);
					// 服务端接收数据
				} catch (Throwable ex)
				{
					ex.printStackTrace();
					Message result = (Message) method.getReturnType().newInstance();
					result.setError(ex.getMessage());
					return result;
				}
			}
		};
		return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[]
		{ clz }, handler);
	} 
 

 

将请求方法,参数封装为协议,通过DataOutputStream将实际请求通过发送出去,并在读取线程中从DataInputStream获取响应数据并转换为返回对象。

Object2String,String2Object这里采用ObjectStream的方式进行转换。

 

/**
	 * 转换为Object
	 * 
	 * @param data
	 * @return
	 * @throws IOException
	 * @throws ClassNotFoundException
	 */
	public static Object String2Object(String data) throws IOException, ClassNotFoundException
	{
		byte[] bytes = Base64.getDecoder().decode(data);
		ObjectInputStream oi = new ObjectInputStream(new ByteArrayInputStream(bytes));
		return oi.readObject();
	}

	/**
	 * 转换为字符串
	 * 
	 * @param obj
	 * @return
	 * @throws IOException 
	 */
	public static String Object2String(Object[] objs) throws IOException
	{
		return Base64.getEncoder().encodeToString(Object2Bytes(objs));
	}

	/**
	 * 转换为byte数组
	 * 
	 * @param obj
	 * @return
	 * @throws IOException
	 */
	public static byte[] Object2Bytes(Object[] objs) throws IOException
	{
		ByteArrayOutputStream bot = new ByteArrayOutputStream(); 
		ObjectOutputStream ot = new ObjectOutputStream(bot);
		for (int i = 0; i < objs.length; i++)
		{
			ot.writeObject(objs[i]);
		} 
		return bot.toByteArray();
	}

 

 

registListener保存注册器,以等待服务器的调用

 

/**
	 * 注册能力
	 */
	public void registListener(Object listener)
	{
		listeners.add(listener);
		Class  clz = listener.getClass();
		long bid=System.currentTimeMillis();//消息块id
		StringBuffer head = new StringBuffer("clz=" + clz.getName()  + ";bid="+bid+";");
		try
		{
			writer.writeUTF(head.toString());
			writer.flush();
		} catch (IOException e)
		{
			e.printStackTrace();
		} 
	}
 

 

 

最后客户端读取线程读取数据,根据不同实现分发功能

 

		public void run()
		{
			String line=null;
			try
			{
				while((line=reader.readUTF())!=null)
				{ 
					String[] datas = line.split(";"); 
					if(datas.length==2)//服务端能力回调
					{
						long bid=Long.parseLong(datas[0].split("=")[1]);
						map.put(bid, datas[1]);
						Logger.d("服务端能力回调:"+bid); 
						synchronized(lock)
						{
							lock.notifyAll();
						} 
					}else//服务器调用客户端能力
					{
						String cls = datas[0].split("=")[1];
						Class clz = Class.forName(cls);
						String mth = datas[1].split("=")[1];
						String type = datas[2].split("=")[1];
						int alen = Integer.parseInt(datas[3].split("=")[1]);
						long bid = Long.parseLong(datas[4].split("=")[1]);
						Logger.d("调用客户端能力:"+datas[0]+";"+datas[1]);
						Object[] args = null;
						if (alen > 0)
						{
							args = String2Objects(datas[5], alen);
						}
						for (Object listener : listeners)
						{ 
							Class lcz = listener.getClass();
							if (clz.isAssignableFrom(lcz))
							{  
								try
								{
									Method ath = findMethod(lcz, mth, type, alen);
									Logger.d("找到能力方法:"+ath);
									ath.invoke(listener, args);
								} catch (IllegalAccessException | IllegalArgumentException
										| InvocationTargetException e)
								{
									e.printStackTrace();
								}
							} 
						}
					}
				}
			} catch (IOException | ClassNotFoundException e)
			{
				e.printStackTrace();
			}
		}
 

 

9.服务端架构具体实现过程

首先服务器开启ServerSocket监听,处理来接客户端的连接。

@Override
	public void run()
	{
		Logger.d("正在启动服务");
		try (ServerSocket server = new ServerSocket(port);)
		{
			while (true)
			{
				Socket socket = server.accept();
				Logger.d("新客户端连接");
				ClientContext client = new ClientContext(this, socket,funs);
				clients.add(client); 
			}
		} catch (IOException e)
		{
			e.printStackTrace();
		}
	}

 

bindService保存服务具体实现对象,连接时以传递给每个新客户端上下文

/**
	 * 暴露服务给客户端
	 * 
	 * @param fun
	 */
	public void bindService(Function<ClientContext, Object> fun)
	{
		this.funs.add(fun);
	}

 

ClientContext客户端上下文,包括了处理读写信息的转换,服务能力的回调,创建读取处理线程等

/**
	 * @param serverContext
	 * @param socket
	 * @param funs
	 * @throws IOException
	 */
	ClientContext(ServerContext serverContext, Socket socket, List<Function<ClientContext, Object>> funs)
			throws IOException
	{
		this.serverContext = serverContext;
		this.socket = socket;
		writer = new DataOutputStream(socket.getOutputStream());
		reader = new DataInputStream(socket.getInputStream());
		// 初始化能力
		apis = new ArrayList<>();
		for (Function<ClientContext, Object> fun : funs)
		{
			Object api = fun.apply(this);
			apis.add(api);
		}
		new ReadWoker().start();
	}

 

getListener获取客户端能力,然服务端并没有客户端具体的实现对象,同样是通过代理方式实现远程的调用

/**
	 * 返回客户端能力
	 * 
	 * @param <T>
	 * @param clz
	 */
	public <T> T getListener(Class<T> clz)
	{
		Logger.d("返回能力");
		// 检验客户端是否有这能力

		InvocationHandler handler = new InvocationHandler()
		{
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
			{
				// 向客户端发送数据
				Logger.d("向客户端发送消息");
				long bid = System.currentTimeMillis();// 消息块id
				StringBuffer head = new StringBuffer("clz=" + clz.getName() + ";mth=" + method.getName() + ";type="
						+ method.getGenericReturnType() + ";length=" + args.length + ";bid=" + bid + ";");
				Logger.d(head);
				if (args.length != 0)
				{
					head.append(Object2String(args));
				}
				writer.writeUTF(head.toString());
				writer.flush();
				// 为了简化,客户端能力不返回数据
				return null;
			}
		};

		return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[]
		{ clz }, handler);

	}

 

最后在读取线程里,分发处理来自客户端不同的请求协议,调用服务端功能或者注册客户端能力

		@Override
		public void run()
		{
			try
			{
				String line = null;
				while ((line = reader.readUTF()) != null)
				{
					Logger.d("接收指令");
					String[] datas = line.split(";");
					String cls = datas[0].split("=")[1];
					if (datas.length >= 4)// 调用服务功能
					{
						Class clz = Class.forName(cls);
						String mth = datas[1].split("=")[1];
						String type = datas[2].split("=")[1];
						int alen = Integer.parseInt(datas[3].split("=")[1]);
						long bid = Long.parseLong(datas[4].split("=")[1]);
						Logger.d("调用服务器功能:" + datas[0] + ";" + datas[1] + ";" + datas[2] + ";" + datas[3] + ";"
								+ datas[3]);
						Object[] args = null;
						if (alen > 0)
						{
							args = String2Objects(datas[5], alen);
						}
						for (Object api : apis)
						{ 
							Class apz = api.getClass();
							if (clz.isAssignableFrom(apz))
							{
								try
								{
									Method ath = findMethod(apz, mth, type, alen);
									Object robj = ath.invoke(api, args);
									writer.writeUTF("bid=" + bid + ";" + Object2String(new Object[]
									{ robj }));
									Logger.d("写入完毕!");
								} catch (IllegalAccessException | IllegalArgumentException
										| InvocationTargetException e)
								{
									e.printStackTrace();
									// TODO 异常放入返回?
								}
								break;
							}
						}
					} else// 注册客户端功能
					{
						long bid = Long.parseLong(datas[1].split("=")[1]);
						Logger.d("注册客服端能力:" + cls);

					}
				}
			} catch (IOException | ClassNotFoundException e)
			{
				e.printStackTrace();
			}finally
			{
				serverContext.getClientContexts().remove(ClientContext.this);
			}
		}

 

10.运行结构

运行demo服务端,运行多个demo客户端,实现简单聊天室功能

 

 

文章评论

老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
程序员都该阅读的书
程序员都该阅读的书
代码女神横空出世
代码女神横空出世
总结2014中国互联网十大段子
总结2014中国互联网十大段子
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
为什么程序员都是夜猫子
为什么程序员都是夜猫子
如何成为一名黑客
如何成为一名黑客
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
那些争议最大的编程观点
那些争议最大的编程观点
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
程序员和编码员之间的区别
程序员和编码员之间的区别
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
程序员应该关注的一些事儿
程序员应该关注的一些事儿
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
每天工作4小时的程序员
每天工作4小时的程序员
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
程序员必看的十大电影
程序员必看的十大电影
漫画:程序员的工作
漫画:程序员的工作
鲜为人知的编程真相
鲜为人知的编程真相
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
Java程序员必看电影
Java程序员必看电影
程序员的鄙视链
程序员的鄙视链
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
 程序员的样子
程序员的样子
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
10个调试和排错的小建议
10个调试和排错的小建议
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
我是如何打败拖延症的
我是如何打败拖延症的
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
中美印日四国程序员比较
中美印日四国程序员比较
编程语言是女人
编程语言是女人
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
一个程序员的时间管理
一个程序员的时间管理
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有