Netty如何实现消息推送

netty 是一个利用 java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 api 的客户端/服务器框架。
maven依赖
io.netty   netty-all   4.1.36.final    
springbootapplication
启动器中需要new一个nettyserver,并显式调用启动netty。
@springbootapplication public class springcloudstudydemoapplication {  public static void main(string[] args) {   springapplication.run(springcloudstudydemoapplication.class,args);   try {    new nettyserver(12345).start();    system.out.println(https://blog.csdn.net/moshowgame);    system.out.println(http://127.0.0.1:6688/netty-websocket/index);   }catch(exception e) {    system.out.println(nettyservererror:+e.getmessage());   }  } }  
nettyserver
启动的nettyserver,这里进行配置
/**  * nettyserver netty服务器配置  * @author zhengkai.blog.csdn.net  * @date 2019-06-12  */ public class nettyserver {     private final int port;       public nettyserver(int port) {         this.port = port;     }       public void start() throws exception {         eventloopgroup bossgroup = new nioeventloopgroup();           eventloopgroup group = new nioeventloopgroup();         try {             serverbootstrap sb = new serverbootstrap();             sb.option(channeloption.so_backlog, 1024);             sb.group(group, bossgroup) // 绑定线程池                     .channel(nioserversocketchannel.class) // 指定使用的channel                     .localaddress(this.port)// 绑定监听端口                     .childhandler(new channelinitializer() { // 绑定客户端连接时候触发操作                           @override                         protected void initchannel(socketchannel ch) throws exception {                             system.out.println(收到新连接);                             //websocket协议本身是基于http协议的,所以这边也要使用http解编码器                             ch.pipeline().addlast(new httpservercodec());                             //以块的方式来写的处理器                             ch.pipeline().addlast(new chunkedwritehandler());                             ch.pipeline().addlast(new httpobjectaggregator(8192));                             ch.pipeline().addlast(new websocketserverprotocolhandler(/ws, null, true, 65536 * 10));                             ch.pipeline().addlast(new mywebsockethandler());                         }                     });             channelfuture cf = sb.bind().sync(); // 服务器异步创建绑定             system.out.println(nettyserver.class +  启动正在监听:  + cf.channel().localaddress());             cf.channel().closefuture().sync(); // 关闭服务器通道         } finally {             group.shutdowngracefully().sync(); // 释放线程池资源             bossgroup.shutdowngracefully().sync();         }     } }  
mychannelhandlerpool
通道组池,管理所有websocket连接
/**  * mychannelhandlerpool  * 通道组池,管理所有websocket连接  * @author zhengkai.blog.csdn.net  * @date 2019-06-12  */ public class mychannelhandlerpool {     public mychannelhandlerpool(){}     public static channelgroup channelgroup = new defaultchannelgroup(globaleventexecutor.instance); }  
mywebsockethandler
处理ws一下几种情况:
channelactive与客户端建立连接
channelinactive与客户端断开连接
channelread0客户端发送消息处理
/**  * nettyserver netty服务器配置  * @author zhengkai.blog.csdn.net  * @date 2019-06-12  */ public class nettyserver {     private final int port;       public nettyserver(int port) {         this.port = port;     }       public void start() throws exception {         eventloopgroup bossgroup = new nioeventloopgroup();           eventloopgroup group = new nioeventloopgroup();         try {             serverbootstrap sb = new serverbootstrap();             sb.option(channeloption.so_backlog, 1024);             sb.group(group, bossgroup) // 绑定线程池                     .channel(nioserversocketchannel.class) // 指定使用的channel                     .localaddress(this.port)// 绑定监听端口                     .childhandler(new channelinitializer() { // 绑定客户端连接时候触发操作                           @override                         protected void initchannel(socketchannel ch) throws exception {                             system.out.println(收到新连接);                             //websocket协议本身是基于http协议的,所以这边也要使用http解编码器                             ch.pipeline().addlast(new httpservercodec());                             //以块的方式来写的处理器                             ch.pipeline().addlast(new chunkedwritehandler());                             ch.pipeline().addlast(new httpobjectaggregator(8192));                             ch.pipeline().addlast(new websocketserverprotocolhandler(/ws, websocket, true, 65536 * 10));                             ch.pipeline().addlast(new mywebsockethandler());                         }                     });             channelfuture cf = sb.bind().sync(); // 服务器异步创建绑定             system.out.println(nettyserver.class +  启动正在监听:  + cf.channel().localaddress());             cf.channel().closefuture().sync(); // 关闭服务器通道         } finally {             group.shutdowngracefully().sync(); // 释放线程池资源             bossgroup.shutdowngracefully().sync();         }     } }  
socket.html
主要是连接ws,发送消息,以及消息反馈
         netty-websocket          id      text                 服务端返回的应答消息       
controller
写好了html当然还需要一个controller来引导页面。
@restcontroller public class indexcontroller {    @getmapping(/index)  public modelandview  index(){   modelandview mav=new modelandview(socket);   mav.addobject(uid, randomutil.randomnumbers(6));   return mav;  }   }  
效果演示
改造netty支持url参数
1.首先,调整一下加载handler的顺序,优先mywebsockethandler在websocketserverprotocolhandler之上。
ch.pipeline().addlast(new mywebsockethandler()); ch.pipeline().addlast(new websocketserverprotocolhandler(/ws, null, true, 65536 * 10)); 2.其次,改造mywebsockethandler 的channelread方法,首次连接会是一个fullhttprequest类型,可以通过fullhttprequest.uri()获取完整ws的url地址,之后接受信息的话,会是一个textwebsocketframe类型。public class mywebsockethandler extends simplechannelinboundhandler {     @override     public void channelactive(channelhandlercontext ctx) throws exception {         system.out.println(与客户端建立连接,通道开启!);         //添加到channelgroup通道组         mychannelhandlerpool.channelgroup.add(ctx.channel());     }     @override     public void channelinactive(channelhandlercontext ctx) throws exception {         system.out.println(与客户端断开连接,通道关闭!);         //添加到channelgroup 通道组         mychannelhandlerpool.channelgroup.remove(ctx.channel());     }     @override     public void channelread(channelhandlercontext ctx, object msg) throws exception {         //首次连接是fullhttprequest,处理参数 by zhengkai.blog.csdn.net         if (null != msg && msg instanceof fullhttprequest) {             fullhttprequest request = (fullhttprequest) msg;             string uri = request.uri();             map parammap=geturlparams(uri);             system.out.println(接收到的参数是:+json.tojsonstring(parammap));             //如果url包含参数,需要处理             if(uri.contains(?)){                 string newuri=uri.substring(0,uri.indexof(?));                 system.out.println(newuri);                 request.seturi(newuri);             }         }else if(msg instanceof textwebsocketframe){             //正常的text消息类型             textwebsocketframe frame=(textwebsocketframe)msg;             system.out.println(客户端收到服务器数据: +frame.text());             sendallmessage(frame.text());         }         super.channelread(ctx, msg);     }     @override     protected void channelread0(channelhandlercontext channelhandlercontext, textwebsocketframe textwebsocketframe) throws exception {     }     private void sendallmessage(string message){         //收到信息后,群发给所有channel         mychannelhandlerpool.channelgroup.writeandflush( new textwebsocketframe(message));     }     private static map geturlparams(string url){         map map = new hashmap();         url = url.replace(?,;);         if (!url.contains(;)){             return map;         }         if (url.split(;).length > 0){             string[] arr = url.split(;)[1].split(&);             for (string s : arr){                 string key = s.split(=)[0];                 string value = s.split(=)[1];                 map.put(key,value);             }             return  map;         }else{             return map;         }     } } 3.html中的ws地址也进行改造socket = new websocket(ws://127.0.0.1:12345/ws?uid=666&gid=777); 4.改造后控制台输出情况收到新连接 与客户端建立连接,通道开启! 接收到的参数是:{uid:666,gid:777} /ws 客户端收到服务器数据:142531:这里输入消息 客户端收到服务器数据:142531:这里输入消息 客户端收到服务器数据:142531:这里输入消息  
failed: websocket opening handshake timed out
听说是ssl wss的情况下才会出现,来自 @around-gao 的解决方法: 把mywebsockethandler和websocketserverprotocolhandler调下顺序就好了。


4940元的自动系鞋带耐克太贵?那你可以看看这个
5G正在孕育颠覆性的变革全面构建数字化技术是社会的基础
电池的续航与安全能否并存?
为了满足玩家随时随地玩VR游戏 索泰推出旗下首款背包电脑VR GO
怎样用AVR微控制器集成的ADC从周围环境中获取模拟读数
Netty如何实现消息推送
亮相联通、电信等多场5G峰会 虹软科技用5G+AI重新定义智能手机
单片机是怎么烧录程序的
意法半导体发布两款灵活多用的电源模块,简化SiC逆变器设计
超越摩尔领域的外延设备技术及市场趋势
华为旗下海思首款OLED驱动IC已开始试产
用LM317制作恒压恒流充电器,LM317 Battery charger
iOS10.3正式版怎么升级:iOS10.3正式版固件下载大全
英特尔正在计划收购以色列AI芯片制造商Habana Labs
802.11物理层测试标准(11n HT-4)汇总详解
dfrobot电机(6V 160RPM 2.8kg.cm)简介
Android的阿喀琉斯之踵
51单片机工程师跳槽到stm32的方法
低功耗蓝牙模块带您了解智能血压计_SKYLAB 蓝牙模块应用
什么是5G智慧园区?5G智慧园区解决方案的优势