netty (3) 채팅 서버 만들어보자!

netty (3) 채팅 서버 만들어보자!

이번에는 배운걸 기준으로 채팅 서버를 만들어 볼 예정이다.
아주 간단하게 메시지를 보내는거와 귓속말을 할 수 있는 서비스를 만들어보자.

public class ChatNettyServer {

  private static final ChatNettyServiceHandler SHARED = new ChatNettyServiceHandler();

  public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
          protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
              .addLast(new StringDecoder(CharsetUtil.UTF_8), new StringEncoder(CharsetUtil.UTF_8))
              .addLast(new ChatNettyMessageCodec(), new LoggingHandler(LogLevel.INFO))
              .addLast(SHARED);
          }
        });

      Channel ch = b.bind(8080).sync().channel();
      ch.closeFuture().sync();

    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      workerGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
}

메인은 따로 설명할 필요는 없을 거 같다.

ChatNettyMessageCodecStringDecoder, StringEncoder 만 설정해줬다. 각자의 맞게 커스텀하게 코덱을 설정 할 수 있다.
한번 codec 클래스를 보자. 그럼 대충은 이해가 갈 듯하다.

public class ChatNettyMessageCodec extends MessageToMessageDecoder<String> {

  @Override
  protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
    String command = msg.substring(0, 2);
    String message = msg.substring(2, msg.length() - 1) + "\n";
    Herder herder = new Herder();
    herder.setCommand(command);
    out.add(new Message(herder, message));
  }
}

MessageToMessageDecoder를 상속 받았다. 클래스 시그네처는 다음과 같다.
MessageToMessageDecoder<I>
인바운드를 설정 해 줄 수 있다. 인바운드를 String으로 설정 한 후에 필자가 만든 Message로 넣어뒀다.
Message는 자바빈처럼 만들었다.

@Data
@AllArgsConstructor
public class Message {
  private Herder herder;
  private String text;

}

@Data
public class Herder {
  private String command;
}

일반적인 자바 빈이다.
다음으로 이벤트 핸들러 이다.

@ChannelHandler.Sharable
public class ChatNettyServiceHandler extends SimpleChannelInboundHandler<Message> {

  private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  final AttributeKey<Integer> id = AttributeKey.newInstance("id");
  private static final AtomicInteger count = new AtomicInteger(0);

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    int value = count.incrementAndGet();
    ctx.channel().attr(id).set(value);
    ctx.writeAndFlush("your id : "+ String.valueOf(value) + "\n");
    channels.writeAndFlush(String.valueOf(value) + " join \n");
    channels.add(ctx.channel());
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    ctx.channel().attr(id).remove();
    channels.remove(ctx.channel());
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }

  protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {

    if ("10".equals(msg.getHerder().getCommand())) {
      channels.writeAndFlush(ctx.channel().attr(id).get() + " :" + msg.getText());;
    } else if ("20".equals(msg.getHerder().getCommand())) {

      String text = msg.getText();
      String substring = text.substring(0, 2).trim();
      String message = text.substring(2, text.length());

      channels.stream().filter(i -> i.attr(id).get() == Integer.parseInt(substring))
        .forEach(i -> i.writeAndFlush(i.attr(id).get() + " : " +message));

      ctx.writeAndFlush(ctx.channel().attr(id).get() + " : " + message);

    } else if("30".equals(msg.getHerder().getCommand())){
      ctx.disconnect();
    }
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
}

여기서 중요한건 SimpleChannelInboundHandler 상속받았다.
ChannelInboundHandlerAdapter 클래스를 상속 받아도 상관없다.
SimpleChannelInboundHandler 또한 ChannelInboundHandlerAdapter를 상속받은 아이다.

public void channelActive(ChannelHandlerContext ctx) throws Exception

이때는 접속 정보를 넣어 뒀다. 그리고 전체 사용자에게 사용자가 입장했다고 전송해줬다.

public void channelInactive(ChannelHandlerContext ctx) throws Exception 

이때에는 클라이언트가 종료를 한 후라 접속정보를 삭제해줬다.

protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception

원래 ChannelInboundHandlerAdapter에선 Object 이지만 SimpleChannelInboundHandler은 타입을 지정해 줄수 있다.
그래서 이걸로 선택했다.
커멘드가 10 일때에는 전체 채팅 20 일때는 귓속말 30일때는 종료 한다.
얼추 소스를 보면 이해가 갈듯 싶다.

telnet으로 접속을 해보자

telnet localhost 8080

Trying ::1...
Connected to localhost.
Escape character is '^]'.
your id : 1

위와 같이 your id : 1이 출력 될 것이다.
창을 한개 더 띄어 telnet을 접속하자
그럼 다음에

10 hello

라는 커멘드를 써보자.
그럼 양쪽에 1 : hello 다음같이 출력 될 것이다.
다음은 귓속말을 해보자
1번 사용자에서

202 hello2

입력하면 2번 사용자에게만 전달 될 것이다.
20은 커멘드 2는 id(두자리라 스페이스) 그런다음에 채팅 문자로 입력 하면 된다.
마지막으로 30을 치고 나가자!

30
Connection closed by foreign host.

다음과 같이 출력된다면 성공적으로 되었다.
얼추 netty의 채팅에 대해서 알아봤다.

netty (2) 이벤트 핸들러

netty

이번엔 이벤트 핸들러에 대해 알아보자
ChannelInboundHandlerAdapter 클래스는 ChannelInboundHandler 인터페이스를 사용한 어댑터이다.
실질적인 구현에 대한 로직은 없다.
아래는 ChannelInboundHandler 인터페이스의 내용이다.

    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    void channelActive(ChannelHandlerContext ctx) throws Exception;

    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

자주쓰는 메소드들을 살펴보자.

channelRegistered

채널이 이벤트 루프에 등록되었을 때 발생한다.
이벤트 루프는 네티가 이베트를 실행하는 스레드로써 부트 스트랩에 설정한 이벤트 루프다.

channelActive

channelRegistered 다음에 바로 실행 되는 메소드다. 이벤트 루프에 등록된 이후에 네티 API를 사용하여 채널 입출력을 수행할 상태다.
다음과 같은 작업을 할 수 있다.
클라이언트 연결 개수를 셀 때
최초 연결 메시지를 보낼 때
연결된 상태에 대한 작업이 필요 할때

channelRead

channelRead 는 제일 빈도가 높은 메소드이다.
데이터가 수신 되었을 때 알려주는 메소드이다. 수신된 데이터는 네티의 ByteBuf 객체에 저장되어 있으며 두번째 인자 msg를 통해 접근가능하다. 실질적인 비지니스 로직도 여기에서 이뤄질거 같다.

channelReadComplete

데이터 수신이 완료되었을음 알려준다.
간단하게 예를 들어 보자. 클라리언트가 서버로 ‘A’, ‘B’, ‘C’ 라는 데이터를 순차적으로 전송했다고 하자. 이때 서버에서는 channelRead 이벤트가 발생하는데 이때 msg 객체에 수신된 데이터가 ‘ABC’라면 다음으로 발생하는 이벤트는 channelReadComplete다. 반대로 msg객체에 수신된 데이터가 ‘A’라면 다음으로 발생하는 이벤트는 channelRead이다. 즉 데이터를 다 읽어 없을때 발생하는 이벤트가 바로 channelReadComplete이다.

channelInactive

channelActive와 반대로 채널이 비활성화 되었을 때 발생한다. 해당 이벤트가 발생한 이후에는 채널에 해당 입출력 작업을 수행 못한다.

channelUnregistered

channelRegistered의 이벤트와 반대로 채널이 이벤트 루프에서 제거 되었을 때 발생한다. 이거 또한 채널에 입출력 작업을 수행하지 못한다.

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
  System.out.println("channelActive");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  System.out.println("channelInactive");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  System.out.println("channelRead");
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  System.out.println("channelReadComplete");
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  System.out.println("channelRegistered");
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  System.out.println("channelUnregistered");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  cause.printStackTrace();
  ctx.close();
}

순서는 즉 이렇다.
클라이언트가 접속 했을 경우 발생
channelRegistered
channelActive
위에 두개가 출력된다.

클라이언트가 메시지를 보낼 때 발생
channelRead

모든 메시지를 다 읽고 없을 때 발생
channelReadComplete

클라이언트가 접속이 끊어 졌을 때
channelInactive
channelUnregistered

이러식으로 이벤트 핸들러가 발생한다.

netty (1)

netty

예전에 책을 샀는데 쓸일이 있을 것도 같아서(혹은 vertx) 공부해본다.
책에 있는 내용을 정리 해본다.

블로킹과 논블러킹

블로킹은 요청한 작업이 성공하거나 에러가 발생하기 전까지 응답을 해주지 않는다.
논블로킹은 성공여부와 상관없이 바로 결과를 돌려주는 것을 말한다.
간단하게(?) 알아보자

public class BlockingServer {
  public static void main(String[] args) throws IOException {
    new BlockingServer().run();
  }

  private void run() throws IOException {
    ServerSocket serverSocket = new ServerSocket(8080);
    System.out.println("접속 대기중");
    while (true) {
      Socket socket = serverSocket.accept();
      System.out.println("클라이언트와 연결");
      OutputStream outputStream = socket.getOutputStream();
      InputStream inputStream = socket.getInputStream();

      while (true) {
        try {
          int request = inputStream.read();
          System.out.print((char) request);
          outputStream.write(request);
        } catch (IOException e) {
          break;
        }
      }
    }
  }
}

자바에서 기본적으로 사용 할 수 있는 블로킹 socket 서버다.
블로킹 소켓의 문제점은 호출된 입출력 메서드의 처리가 완료될 때 까지 응답을 돌려 주지 않는다.
그래서 여러 클라이언트들이 접속을 했을 경우엔 시간이 오래 걸리는 일을 한다면 그 다음 클라이언트는 대기를 하고 있어야된다.
그래서 나온게 클라이언트가 접속 했을 경우 새로운 스레드를 만들어서 처리 하는 방법이 생겨났다.
그러나 이거에 문제점은 접속자가 많을 경우 스레드양이 어마어마하게 증가 할 것이다.
이걸 막기 위해 스레드풀이라는걸 사용한다.
스레드 풀 사용 해도 동시접속자가 늘어나면 스레드 풀의 크기를 늘려야 된다. 아주 고민이 많다.
스레드 풀을 자바힙이 허용하는 최대 한도에 도달할 때까지 늘리는 것이 맞는지도 봐야된다.

이런 단점을 개선한 방식이 논블로킹 소켓이다.
일단 소스를 보자. 복잡하다.

public class NonBlockingServer {

    private Map<SocketChannel, List<byte[]>> keepDataTrack = new HashMap<>();
    private ByteBuffer buffer = ByteBuffer.allocate(2*1024);

    public static void main(String[] args) {
        NonBlockingServer nonBlockingServer = new NonBlockingServer();
        nonBlockingServer.startEchoServer();
    }

    private void startEchoServer() {

        try(Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()){
            if((serverSocketChannel.isOpen()) && selector.isOpen()){
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.bind(new InetSocketAddress(8888));

                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("접속 대기 중");

                while(true) {
                    selector.select();
                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    while (keys.hasNext()) {
                        SelectionKey key = keys.next();
                        keys.remove();

                        if (!key.isValid()) {
                            continue;
                        }
                        if (key.isAcceptable()) {
                            this.acceptOP(key, selector);
                        } else if (key.isReadable()) {
                            this.readOP(key);
                        } else if (key.isWritable()) {
                            this.write(key);

                        } else {
                            System.out.println("서버 소켓 생성 못함");
                        }
                    }
                }
            }


        } catch (Exception e){
            System.out.println("에러 ");
        }
    }

    private void write(SelectionKey key) throws IOException, InterruptedException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        List<byte[]> channelData = keepDataTrack.get(socketChannel);
        Iterator<byte[]> its = channelData.iterator();

        while(its.hasNext()){
            byte[] it = its.next();
            its.remove();
            socketChannel.write(ByteBuffer.wrap(it));
        }
        key.interestOps(SelectionKey.OP_READ);
    }

    private void readOP(SelectionKey key) {
        try{
            SocketChannel socketChannel = (SocketChannel) key.channel();
            buffer.clear();
            int numRead = -1;
            try{
                numRead = socketChannel.read(buffer);
            } catch (IOException e){
                System.out.println("데이터 읽기 에러 ");
            }
            if(numRead == -1){
                this.keepDataTrack.remove(socketChannel);
                System.out.println("클라이언트 연결 종료");
                socketChannel.close();
                key.cancel();
                return;
            }

            byte[] data = new byte[numRead];
            System.arraycopy(buffer.array(), 0 ,data, 0, numRead);
            System.out.println(new String(data, "utf-8") + " from" + socketChannel.getRemoteAddress());

            doEchoJob(key, data);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private void doEchoJob(SelectionKey key, byte[] data) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        List<byte[]> channelData = keepDataTrack.get(socketChannel);
        channelData.add(data);
        key.interestOps(SelectionKey.OP_WRITE);
    }

    private void acceptOP(SelectionKey key, Selector selector) throws IOException{
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        System.out.println("클라이언트 연결 " + socketChannel.getRemoteAddress());
        keepDataTrack.put(socketChannel, new ArrayList<byte[]>());
        socketChannel.register(selector, SelectionKey.OP_READ);
    }
}

데이터를 읽고 쓰는 부분의 로직이 완전히 분리 되어있다.
구조는 다음과 같다.

블로킹

blocking

논블로킹

nonblocking

그림이 영 시원찮네.
물론 자바의 기본적인 socket server를 구현해서 개발해도 상관은 없다.
하지만 유지보수면이나 개발의 난이도를 볼때 netty를 이용하는게 더 효율적이지 않나 싶다.

네티의 기본적인 에코 서버를 한번 만들어보자.

public class NettyServer {

  public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(new EchoServiceHandler());
          }
        });

      Channel ch = b.bind(8080).sync().channel();
      ch.closeFuture().sync();

    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      workerGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
}

NioEventLoopGroup 인자는 스레드 개수를 의미한다.
만약 없을 경우에는 코어 수에 따른 스레드가 발생한다.(더 자세히는 코어 수 * 2 , 하이퍼 스레딩을 지원한다면 코어 * 2(하이퍼 스레딩) * 2)
첫번째의 bossgroup은 클라이언트의 연결을 수학하는 역할을 하고 두번째의 workgroup은 연결된 클라리언트의 데이터 입력출 및 이벤트 처리를 담당한다.
option들은 책 혹은 인터넷에 찾아보기 바라며 중요한건 파이브라인에 EchoServiceHandler를 등록하는 것이다.
이는 클라이언트의 연결이 되었을때 데이터를 처리하는 역할을 한다.
netty는 아주 간편하게 블로킹 논블로킹 epoll로 바꿀수가 있다.
위의 클래스에서 다음과 같이 3개만 변경하면 된다.

EventLoopGroup bossGroup = new OioEventLoopGroup(1); //변경 부분
EventLoopGroup workerGroup = new OioEventLoopGroup(); //변경 부분

...

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
  .channel(OioServerSocketChannel.class) //변경 부분
...

아주 간편하게 바뀌었다.
epoll도 마찬가지로 3부분만 바꾸면 가능하다.
epoll은 리눅스에서만 실행된다. (mac도 안된다.)

다음으로 EchoServiceHandler 클래스를 보자

public class EchoServiceHandler extends ChannelInboundHandlerAdapter{

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf) msg;
    System.out.print(byteBuf.toString(Charset.defaultCharset()));
    ctx.writeAndFlush(msg);
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
}

ChannelInboundHandlerAdapter 클래스를 상속받았다.
그리고 channelRead과 exceptionCaught를 오버라이딩 했다.
channelRead은 클라이언트에서 데이터를 보내면 호출 되는 메서드이다.
channelRead에 있는 writeAndFlush 메서드는 데이터의 기록과 전송을 하는 메소드이다.
한마디로 write와 flush가 함께작동하는 메서드라 생각하면 되겠다.

exceptionCaught 메서드는 오류가 발생 하였을 경우 호출 된다.

한번 telnet으로 접속해서 테스트를 해보자

telnet localhost 8080

그리고 나서 제대로 동작하는지 커멘드를 날려보자!
간단하게 네티에 대해서(?) 알아봤다.
몇편 더 쓸예정인데.. 아무튼 공부좀 더 하고 써야겠다.