(五)微服务底层通信和协议--1篇

懒驴 2021年12月04日 1,588次浏览

Java网络通信

传统BIO编程

通信的本质其实就是I/O,Java的网络编程主要涉及的内容是Socket编程,其他还有多线程编程、协议栈等相关知识。在JDK1.4推出Java NIO之前,基于Java的所有Socket通信都采用同步阻塞模式(BIO),类似于一问一答模式。客户端发起一次请求,同步等待调用结果的返回。同步阻塞模式易于调试且容易理解,但是存在严重的性能问题。
传统的的同步阻塞模式开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。服务端提供IP和监听端口,客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信。
BIO的服务端通信模型,采用BIO通信模型的服务端,通过由一个独立的Acceptor(消费者)线程负责监听客户端连接,它接收到客户端连接请求之后,为每个客户端创建一个新的线程进行链路处理。处理完成后,通过输出流返回应答客户端,线程销毁,即典型的一请求一应答通信模型。

传统BIO通信模型

该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问呈1:1的正比关系,Java中的线程也是比较宝贵的系统资源,线程的数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终会崩掉。

一个简单的Socket通信

Client类
  1. 编写Socket客户端 Client,代码如下:
package com.study.socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/***
 * Socket客户端
 */
public class Client {

    private static int DEFAULT_SERVER_PORT=1234;            //默认端口

    private static String DEFAULT_SERVER_IP="127.0.0.1";    //默认的服务器IP

    //发送信息
    public static void send(String expression){
        send(DEFAULT_SERVER_PORT, expression);
    }

    public static void send(int port, String expression){
        System.out.println("客户端算术表达式为:"+expression);
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try{
            //1. 创建socket对象
            socket = new Socket(DEFAULT_SERVER_IP, port);
            //2. 获取此套接字的输出流,并包装BufferedReader对象
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            //3. 获取输出流,并保包装PrintWriter对象
            out = new PrintWriter(socket.getOutputStream(), true);
            //4. 往服务端写数据
            out.println(expression);
            //5. 获取服务端返回的数据
            System.out.println("返回的结果为:"+in.readLine());
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //6. 结束关闭相关的流
            if(in != null){
                try{
                    in.close();
                }catch (IOException e){
                    e.printStackTrace();
                }
                in = null;
            }
            if(out != null){
                out.close();
                out = null;
            }
            if(socket != null){
                try{
                    socket.close();
                }catch (IOException e){
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }
}

ServerBetter类
  1. 编写Socket服务端 ServerBetter,代码如下:
package com.study.socket;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/***
 * Socket服务器端
 */
public class ServerBetter {

    private static int DEFAULT_PORT = 1234;        //默认的端口号

    /**单例的ServerSocket**/
    private static ServerSocket server;
    /**根据传入参数设置监听端口,如果没有参数,就调用默认的方法**/
    public static void start() throws IOException{
        //使用默认端口
	start(DEFAULT_PORT);
    }
    //这里我们使用synchronized,因为这个方法不会被大量访问,不太需要考虑效率,直接进行方法同步就可以
    public synchronized static void start(int port) throws IOException{
        if(server != null){
            return;
        }
        try{
            //1. 通过构造函数创建ServerSocket,如果端口合法且空闲,服务端就会监听成功
            server = new ServerSocket(port);
            //2. 通过无限循环监听客户端连接,如果没有客户端接入,将阻塞在accept操作上
            while (true){
                Socket socket = server.accept();
                //3. 当有新的客户端接入时,创建一个新的线程处理这条Socket链路
                new Thread(new ServerHandler(socket)).start();
            }
        }finally {
            //4. 服务器关闭时,清理相关的资源
            if(server != null){
                System.out.println("服务器已经关闭!");
                server.close();
                server = null;
            }
        }
    }

}

ServerHandler类
  1. 编写服务线程类 ServerHandler,类似于服务端收到客户端请求后,创建新的线程执行任务,代码如下:
package com.study.socket;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/***
 * 服务线程类,类似于服务端收到客户端请求后,创建新的线程执行任务
 */
public class ServerHandler implements Runnable {
    private Socket socket;
    public ServerHandler(Socket socket){
        this.socket=socket;
    }
    //线程
    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try{
            //1. 获取字符串的输入流,并包装BufferedReader对象
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            //2. 获取输出流,并包装PrintWriter对象
            out = new PrintWriter(socket.getOutputStream(), true);
            String expression, result;
            while (true){
                //3. 通过BufferedReader读取一行
                //如果已经读到输入流尾部,就返回null,退出循环
                //如果得到非空,就尝试计算结果返回
                if((expression=in.readLine())==null){
                    break;
                }
                System.out.println("服务端收到的消息:"+expression);
                try{
                    //4. 计算客户端传入的字符串,这里简单处理,全部返回结果123
                    result="结果123";
                }catch (Exception e){
                    result="计算错误:"+e.getMessage();
                }
                //5. 将结果写入输出流,返回给客户端
                out.println(result);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //6. 清理相关资源
            if(in != null){
                try{
                    in.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
                in = null;
            }
            if(out != null){
                out.close();
                out = null;
            }
            if(socket != null){
                try{
                    socket.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }
}

BioTest类
  1. 编写测试类BioTest,代码如下:
package com.study.socket;

import java.io.IOException;
import java.util.Random;

/***
 * BIO测试类
 */
public class BioTest {
    public static void main(String[] args) throws InterruptedException{
        //1. 启动线程,运行服务器
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    ServerBetter.start();
                }catch (IOException e){
                    e.printStackTrace();
                }
            }
        }).start();

        //2. 主线程sleep 100毫秒,避免客户端在执行服务器启动前执行后面的代码
        Thread.sleep(100);

        //3.启动线程,运行客户端
        final char operators[] = {'+', '-', '*', '/'};
        final Random random = new Random(System.currentTimeMillis());
        new Thread(new Runnable() {
            @SuppressWarnings("static-access")
            @Override
            public void run() {
                //4. 无限循环
                while (true){
                    //随机产生算术表达式
                    String expression = random.nextInt(10)+""+operators[random.nextInt(4)]
                            +(random.nextInt(10)+1);
                    //客户端发送算术表达式字符串给服务端
                    Client.send(expression);
                    try{
                        //线程sleep1000毫秒(随机),也就是1秒
                        Thread.currentThread().sleep(random.nextInt(1000));
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

运行测试类,结果如下:
测试运行结果

但是,如果,通过上面的例子测试,假设存在这么一个场景,由于网络延迟,导致数据发送缓慢,由于使用的是阻塞IO,read方法一直处于阻塞状态,要等到数据传送完成才结束。在这种情况且高并发的情况下,直接会导致线程暴增,服务器宕机。

以上就是对传统BIO编程做了简单的介绍,下一篇将介绍伪异步I/O编程NIO编程