登录 |  注册
首页 >  编程语言 >  Java多线程与并发编程专题笔记 >  netty实现简单的消息的广播

netty实现简单的消息的广播

思路:

  1. 使用netty提供的ChannelGroup来保存连接到服务端的客户端连接

    1. ChannelGroup的作用(好处)是:通过操作ChannelGroup,实现对加入到ChannelGroup中的所有的Channel进行统一操作

  2. 在Server端的客户端与服务端建立好连接时的回调函数handlerAdded方法中,将新的连接加入到ChannelGroup

    1. channelGroup.add(channel);

  3. (可选)在Server端的客户端与服务端连接关闭时回调函数handlerRemoved方法中,将连接从ChannelGroup中删除

    1. channelGroup.remove(channel);

    2. 此步骤会被netty自动调用,我们调用与否都可以,可以不进行显式调用

  4. 在Server端handler的channelRead0方法中,将收到某客户端的消息发送到ChannelGroup中所有客户端

Netty解决channel管理,可广播消息

在Netty中提供了ChannelGroup接口,该接口继承Set接口,因此可以通过ChannelGroup可管理服务器端所有的连接的Channel,然后对所有的连接Channel广播消息。

Server端

public class BroadCastServer {

    public static void run(int port) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                     .channel(NioServerSocketChannel.class)                // 设置Channel Type
                     .option(ChannelOption.SO_BACKLOG, 1024)            // 设置Channel属性
                     .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                            pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new BroadCastChannelHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            if (channelFuture.isDone()) {
                System.out.println(String.format("server bind port %s sucess", port));
            }
            Channel channel = channelFuture.channel();
            /**CloseFuture异步方式关闭*/
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String []args) {
        BroadCastServer.run(8080);
    }
        
}

public class BroadCastChannelHandler extends ChannelInboundHandlerAdapter {

    private static final Gson GSON = new GsonBuilder().create();
    
    private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    private static final AtomicInteger response = new AtomicInteger();
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel ch = ctx.channel();
        if (ChannelGroups.size() > 0) {
            Message msg = new Message(ch.remoteAddress().toString().substring(1), SDF.format(new Date()));
            ChannelGroups.broadcast(GSON.toJson(msg), new ChannelMatchers());
        }
        ChannelGroups.add(ch);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (ChannelGroups.contains(ch) && String.valueOf(msg).equals("welcome")) {
            System.out.println(String.format("receive [%s] from [%s] at [%s]", String.valueOf(msg) ,
                                    ch.remoteAddress().toString().substring(1), SDF.format(new Date())));
            response.incrementAndGet();
        }
        synchronized (response) {
            System.out.println(response.get() + "\t" + ChannelGroups.size());
            if (response.get() == ChannelGroups.size() - 1) {
                System.out.println("server close all connected channel");
                ChannelGroups.disconnect();
                response.set(0);
            }
        }
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        ctx.close();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ChannelGroups.discard(ctx.channel());
        response.decrementAndGet();
    }

    public static class ChannelMatchers implements ChannelMatcher {

        @Override
        public boolean matches(Channel channel) {
            return true;
        }
        
    }
    
}

服务器端收到所有连接客户端对广播消息的响应后,服务器端主动关闭已连接的Channel  

客户端:

public class Client {
 
    private static final String host = "127.0.0.1";
     
    private static final int port = 8080;
     
    private static final ExecutorService es = Executors.newFixedThreadPool(5);
     
    public static void start() {
        for (int i = 0; i < 5; i++) {
            es.execute(new Task());
        }
        es.shutdown();
    }
     
    public static class Task implements Runnable {
 
        @Override
        public void run() {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                         .channel(NioSocketChannel.class)
                         .option(ChannelOption.TCP_NODELAY, true)
                         .handler(new ChannelInitializer<SocketChannel>() {
 
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                                pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                                pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                                pipeline.addLast(new SimpleClientChannelHandler());
                            }
                             
                        });
                ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
                if (channelFuture.isSuccess()) {
                    System.out.println(String.format("connect server(%s:%s) sucess", host, port));
                }
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
         
    }
     
    public static void main(String []args) {
        Client.start();
    }
}
 
 
public class SimpleClientChannelHandler extends ChannelInboundHandlerAdapter {
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(String.format("client(%s) receive message [%s]", channel.localAddress().toString().substring(1),
                                String.valueOf(msg)));
        System.out.println();
        ctx.writeAndFlush(String.valueOf("welcome"));
    }
 
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        ctx.disconnect(ctx.newPromise());
        ctx.close();
        System.out.println(String.format("client(%s) close sucess", ctx.channel().localAddress().toString().substring(1)));
    }
}

  本文使用ChannelGroups辅助类管理服务器端已连接的Channel,代码实现如下:

public class ChannelGroups {
 
    private static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup("ChannelGroups", GlobalEventExecutor.INSTANCE);
     
    public static void add(Channel channel) {
        CHANNEL_GROUP.add(channel);
    }
     
    public static ChannelGroupFuture broadcast(Object msg) {
        return CHANNEL_GROUP.writeAndFlush(msg);
    }
     
    public static ChannelGroupFuture broadcast(Object msg, ChannelMatcher matcher) {
        return CHANNEL_GROUP.writeAndFlush(msg, matcher);
    }
     
    public static ChannelGroup flush() {
        return CHANNEL_GROUP.flush();
    }
     
    public static boolean discard(Channel channel) {
        return CHANNEL_GROUP.remove(channel);
    }
     
    public static ChannelGroupFuture disconnect() {
        return CHANNEL_GROUP.disconnect();
    }
     
    public static ChannelGroupFuture disconnect(ChannelMatcher matcher) {
        return CHANNEL_GROUP.disconnect(matcher);
    }
     
    public static boolean contains(Channel channel) {
        return CHANNEL_GROUP.contains(channel);
    }
     
    public static int size() {
        return CHANNEL_GROUP.size();
    }
}

原文链接: https://www.yukx.com/xiaomengbao/article/details/2343.html 优科学习网netty实现简单的消息的广播

<<上一课程
下一课程>>
推荐文章
  • 什么是高并发?高并发(HighConcurrency)是一种系统运行过程中遇到的一种“短时间内遇到大量操作请求”的情况,主要发生在web系统集中大量访问收到大量请求(例如:12306的抢票情况;天猫双十一活动)。该情况的发生会导致系统在这段时间内执行大量操作,例如对资源的请求,数据库的操作等。高并发
  • 首先就是创建一个普通的Java工程,起名TrySpring,最终项目如下图:然后在创建两个包分别为service、spring。service用来存放bean、配置类等,spring包用来存放我们手撸的spring注解等在service中创建UserService类,再创建Test类,在Test类中
  •      AQS(AbstractQueuedSynchronizer)是一个抽象的队列同步器,通过维护一个共享资源状态(VolatileIntState)和一个先进先出(FIFO)的线程等待队列来实现一个多线程访问共享资源的同步框架。一、AQS原理     AQS为每个共享资源都设置一个共享资源锁
  • word-break:break-all举例一般情况下,元素拥有默认的white-space:normal(自动换行,PS:不换行是white-space:nowrap),当录入的文字超过定义的宽度后会自动换行,但当录入的数据是一堆没有空格的字符或字母或数字(常规数据应该不会有吧,但有些测试人员是会
  • Bigdecimal的初始化这里对比了两种形式,第一种直接value写数字的值,第二种用string来表示        BigDecimal num1 = new BigDecimal(0.005);         BigDecimal num2 = new BigDecimal(1000000
  • 基本概念定义 双亲委派模型要求除了顶层的启动类加载器外,其余的类加载器都应当有自己的父类加载器。双亲委派机制双亲委派机制是指当一个类加载器收到一个类加载请求时,该类加载器首先会把请求委派给父类加载器。每个类加载器都是如此,只有在父类加载器在自己的搜索范围内找不到指定类时,子类加载器才会尝试自己去加载
学习大纲