开发者

springboot整合SSE技术开发小结

开发者 https://www.devze.com 2023-11-25 10:29 出处:网络 作者: 飞翔的佩奇
目录一、开发背景二、快速了解SSE1、概念2、特性三、开发思路四、代码演示1、引入依赖2、服务端代码3、后端定时任务代码4、解决乱码的实体类5、前端代码五、核心代码分析一、开发背景
目录
  • 一、开发背景
  • 二、快速了解SSE
    • 1、概念
    • 2、特性
  • 三、开发思路
    • 四、代码演示
      • 1、引入依赖
      • 2、服务端代码
      • 3、后端定时任务代码
      • 4、解决乱码的实体类
      • 5、前端代码
    • 五、核心代码分析

      一、开发背景

      公司需要开发一个大屏界面,大屏页面的数据是实时更新的,由后端主动实时推送数据给大屏页面。此时会立刻联想到:websocket 技术。当然使用websocket,确实可以解决这个场景。但是今天本文的主角是 :SSE,他和websocket略有不同,SSE只能由服务端主动发消息,而websocket前后端都可以推送消息。

      二、快速了解SSE

      1、概念

      SSE全称 Server Sent Event,顾名思义,就是服务器发送事件,所以也就注定了他编程 只能由服务端发送信息。

      2、特性

      • 主动从服务端推送消息的技术
      • 本质是一个HTTP的长连接
      • 发送的是一个stream流,格式为text/event-stream

      三、开发思路

      要实现后端的实时推送消息,前台实时更新数据,思路如下:

      • 1、前后端需要建立连接
      • 2、后端如何做到实时推送信息呢?可以采用定时调度

      四、代码演示

      1、引入依赖

      原则上是不需要引入的,因为springboot底层已经整合了SSE

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
      </dependency编程客栈>

      2、服务端代码

      controller层

      @RestController
      @CrossOrigin
      @Requhttp://www.devze.comestMapping("/sse")
      public class SseEmitterController extends BaseController {
      
          @Autowired
          private SseEmitterService sseEmitterService;
      
          /**
           * 创建SSE连接
           *
           * @return
           */
          @GetMapping("/connect/{type}")
          public SseEmitter connect(@PathVariable("type") String type) {
              return sseEmitterService.connect(type);
          }
      }
      
      

      service层

      public interface SseEmitterService {
      
          SseEmitter connect(String type);
      
          void volumeOverview();
      
          void sysOperation();
      
          void monitor();
          ........
      }
      

      service实现层

      @Service
      public class SseEmitterServiceImpl implements SseEmitterService {
      
          private final Logger logger = LoggerFactory.getLogger(this.getClass());
      
          private static Map<String, SseEmitterUTF8> sseCache = new ConcurrentHashMap<>();
      
      
          /**
           * 创建连接sse
           * @param type
           * @return
           */
          @Override
          public SseEmitter connect(String type) {
      
              final String clientId = UUID.randomUUID().toString().replace("-", "");
      
              SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);
              try {
                  sseEmitter.send(SseEmitter.event().comment("创建连接成功 !!!"));
              } catch (IOException e) {
                  logger.error("创建连接失败 , {} " , e.getMessage());
              }
              sseEmitter.onCompletion(() -> {
                  logger.info("connect onCompletion , {} 结束连接 ..." , clientId);
                  removeClient(clientId);
              });
              sseEmitter.onTimeout(() -> {
                  logger.info("connect onTimeout , {} 连接超时 ..." , clientId);
                  removeClient(clientId);
              });
              sseEmitter.onError((throwable) -> {
                  logger.error("connect onError , {} 连接异常 ..." , clientId);
                  removeClient(clientId);
              });
              sseCache.put(clientId, sseEmitter);
      
              //立即推送
              volumeOverview();
              dealResp();
              monitor();
              if (type.equals(SseEmitterConstant.OVER_VIEW)){
                  sysOperation();
                  mileStone();
              }
              logger.info("当前用户总连接数 : {} " , sseCache.size());
              return sseEmitter;
          }
      
          /**
           * 交易量概览
           */
          @Override
          public void volumeOverview() {
      
              Map<String,Object> map = new HashMap<>();
              map.put("latest_tps",440.3);
              map.put("total_cics_trans",341656001);
              map.put("total_zjcx_trans",391656001);
              map.put("zjcx_tps",23657);
              map.put("day10",48388352);
         编程     map.put("history",105013985);
      
              SseEmitter.SseEventBuilder data = SseEmitter.event().name(SseEmitterConstant.VOLUME_OVERVIEW).data(map, MediaType.APPLICATION_jsON);
      
              for (Map.Entry<String, SseEmitterUTF8> entry : sseCache.entrySet()) {
                  SseEmitterUTF8 sseEmitter = entry.getValue();
                  if (sseEmitter == null) {
                      continue;
                  }
                  try {
                      sseEmitter.send(data);
                  } catch (IOException e) {
                      String body = "SseEmitterServiceImpl[volumeOverview  ]";
                      logger.error(body + ": 向客户端 {} 推送消息失败 , 尝试进行重推 : {}", entry.getKey() ,e.getMessage());
                      messageRepush(entry.getKey(),data,body);
                  }
      
              }
          }
      		private void messageRepush(String type, SseEmitter.SseEventBuilder data,String body){
              for (int i = 0; i < 3; i++) {
                  try {
                      Thread.sleep(2000);
                      SseEmitterUTF8 sseEmitter = sseCache.get(type);
                      if (sseEmitter == null) {
                          logger.error(body + " :向客户端{} 第{}次消息重推失败,未创建长链接", type, i + 1);
                          continue;
                      }
                      sseEmitter.send(data);
                  } catch (Exception ex) {
                      logger.error(body + " :向客户端{} 第{}次消息重推失败", type, i + 1, ex);
                      continue;
                  }
                  logger.info(body + " :向客户端{} 第{}次消息重推成功", type, i + 1);
                  return;
              }
          }
      

      常量类

      public class SseEmitterConstant {
      
          /**
           * 创建连接的客户端类型
           */
          public static final String OVER_VIEW = "overview";
      
      
          /**
           * even 数据类型
           */
          public static final String VOLUME_OVERVIEW = "vw";
      
      
      
          public SseEmitterConstant(){}
      }
      
      

      3、后端定时任务代码

      采用注解的方式实现:@Scheduled,使用该注解时,需要增加这个注解@EnableScheduling,相当于来开启定时调度功能,如果不加@EnableScheduling注解,那么定时调度会不生效的。

      启动类增加注解@EnableScheduling

      package com.hidata;
      
      import org.myBATis.spring.annotation.MapperScan;
      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.SpringBootApplication;
      import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
      import org.springframework.context.annotation.ComponentScan;
      import org.springframework.scheduling.annotation.EnableScheduling;
      
      @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
      @EnableScheduling
      public class HidataApplication {
      
          public static void main(String[] args)
          {
              SpringApplication.run(HidataApplication.class, args);
              System.out.println("[HiUrlShorter platform startup!]");
          }
      }
      
      

      创建 定时任务调度类,在该类上加上@Scheduled注解,

      @Configuration
      public class SendMessageTask{
      
          private final Logger logger = LoggerFactory.getLogger(this.getClass());
      
          @Autowired
          private SseEmitterService sseEmitterService;
      
          @Scheduled(cron = "0/40 * * * * ?}")
          public void volumeOverviewTask() {
      
              try {
                  sseEmitterService.volumeOverview();
              } catch (Exception e) {
                  logger.error("SendMessageTask [volumeOverviewTask]: {} ",e.getMessage());
              }
          }
      .......
      }
      
      
      

      4、解决乱码的实体类

      如果发送中文数据的时候,会出现乱码的现象。此时需要做对应的处理

      package com.hidata.devops.lagrescreen.domain;
      
      import org.springframework.http.HttpHeaders;
      import org.springframework.http.MediaType;
      import org.springframework.http.server.ServerHttpResponse;
      import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
      
      import Java.nio.charset.StandardCharsets;
      
      public class SseEmitterUTF8 extends SseEmitter {
      
          public SseEmitterUTF8(Long timeout) {
              super(timeout);
          }
      
          @Override
          protected void extendR编程客栈esponse(ServerHttpResponse outputMessage) {
              super.extendResponse(outputMessage);
      
              HttpHeaders headers = outputMessage.getHeaders();
              headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
          }
      }
      
      
      

      5、前端代码

          // 连接服务器
          var sseSource = new EventSource("http://localhost:8080/sse/connect");
          // 连接打开
          sseSource.onopen = function () {
              console.log("连接打开");
          }
      
          // 连接错误
          sseSource.onerror = function (err) {
              console.log("连接错误:", err);
          }
          
      	//接收信息
          eventSource.addEventListener("vw", function (event) {
          console.log(event.data);
          .....
        });
      

      五、核心代码分析

      先看代码片段

      SseEmitter.event().name("vw").data(map, MediaType.APPLICATION_JSON);
      

      分析:

      后端不会把所有数据一起发送给前端,而是会把页面分成多个模块,然后发给前端,此时前端需要区分哪一块数据对应哪一块页面。所以我们可以给各个模块的数据起个名字。也就是上述的代码

      SseEmitter.event().name("vw")
      

      这样,前端就知道怎么渲染页面了,类似于这样

      springboot整合SSE技术开发小结

      关于even()的属性,可以查看源码,

      public interface SseEventBuilder {
              SseEmitter.SseEventBuilder id(String var1);
      
              SseEmitter.SseEventBuilder name(String var1);
      
              SseEmitter.SseEventBuilder reconnectTime(long var1);
      
              SseEmitter.SseEventBuilder comment(String var1);
      
              SseEmitter.SseEventBuilder data(Object var1);
      
              SseEmitter.SseEventBuilder data(Object var1, @Nullable MediaType var2);
      
              Set<DataWithMediaType> build();
          }
      

      springboot整合SSE技术开发小结

      到此这篇关于springboot整合SSE技术开发小结的文章就介绍到这了,更多相关springboot整合SSE内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)! 

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号