开发者

SpringBoot中实现Redis Stream队列的代码实例

开发者 https://www.devze.com 2024-09-13 10:18 出处:网络 作者: 保加利亚的风
目录前言准备工作1、pom2、 yml3、 RedisStreamUtil工具类代码实现生产者发送消息消费者监听消息进行消费监听测试总结前言
目录
  • 前言
  • 准备工作
    • 1、pom
    • 2、 yml
    • 3、 RedisStreamUtil工具类
  • 代码实现
    • 生产者发送消息
    • 消费者监听消息进行消费
    • 监听测试
  • 总结

    前言

    简单实现一下在SpringBoot中操作Redis Stream队列的方式,监听队列中的消息进行消费。

    • jdk:1.8
    • springboot-version:2.6.3
    • redis:5.0.1(5版本以上才有Stream队列)

    准备工作

    1、pom

    redis 依赖包(version 2.6.3)

            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
    

    2、 yml

    spring: 
      redis:
        database: 0
        host: 127.0.0.1
    

    3、 RedisStreamUtil工具类

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.stream.MapRecord;
    import org.springframework.data.redis.connection.stream.StreamInfo;
    import org.springframework.data.redis.connection.stream.StreamOffset;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import Java.util.List;
    import java.util.Map;
    
    @Component
    public class RedisStreamUtil {
    
    	@Autowired
    	private RedisTemplate<String, Object> redisTemplate;
    
    	/**
    	 * 创建消费组
    	 *
    	 * @param key   键名称
    	 * @param group 组名称
    	 * @return {@link String}
    	 */
    	public String oup(String key, String group) {
    		return redisTemplate.opsForStream().createGroup(key, group);
    	}
    
    	/**
    	 * 获取消费者信息
    	 *
    	 * @param key   键名称
    	 * @param group 组名称
    	 * @return {@link StreamInfo.XInfoConsumers}
    	 */
    	public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
    		return redisTemplate.opsForStream().consumers(key, group);
    	}
    
    	/**
    	 * 查询组信息
    	 *
    	 * @param key 键名称
    	 * @return
    	 */
    	public StreamInfo.XInfoGroups queryGroups(String key) {
    		return redisTemplate.opsForStream().groups(key);
    	}
    
    	// 添加Map消息
    	public String addMap(String key, Map<String, Object> value) {
    		return redisTemplate.opsForStream().add(key, value).getValue();
    	}
    
    	// 读取消息
    	public List<MapRecord<String, Object, Object>> read(String key) {
    		return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
    	}
    
    	// 确认消费
    	public Long ack(String key, String group, String... recordIds) {
    		return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
    	}
    
    	// 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
    	public Long del(String key, String... recordIds) {
    		return redisTemplate.opsForStream().delete(key, recordIds);
    	}
    
    	// 判断是否存在key
    	public boolean hasKey(String key) {
    		Boolean aBoolean = redisTemplate.hasKey(key);
    		return aBoolean != null && aBoolean;
    	}
    }

    代码实现

    生产者发送消息

    生产者发送消息,在Service层创建addMessage方法,往队列中发送消息。

    代码中addMap()方法第一个参数为key,第二个参数为value,该key要和后续配置的保持一致,暂时先记住这个key。

    @Service
    @Slf4j
    @RequiredArgsConstructor
    public class RedisStreamMqServiceImpl implements RedisStreamMqService {
    
        private final RedisStreamUtil redisStreamUtil;
    
        /**
         * 发送一个消息
         *
         * @return {@code Object}
         */
        @Override
        public Object addMessage() {
            RedisUser redisUser = new RedisUser();
            redisUser.setAge(18);
            redisUser.setName("hcr");
            redisUser.setEmail("156ef561@gmail.com");
    
            Map<String, Object> message = new HashMap<>();
            message.put("user", redisUser);
    
            String recordId = redisStreamUtil.addMap("mystream", message);
            return recordId;
        }
    }
    

    controller接口方法

    @RestController
    @RequestMapping("/redis")
    @Slf4j
    @RequiredArgsConstructor
    public class RedisController {
    
      编程客栈  private final RedisStreamMqService redisStreamMqService;
    
        @GetMapping("/addMessage")
        public Object addMessage() {
            return redisStreamMqService.addMessage();
        }
    }
    

    调用测试,查看redis中是否正常添加数据。

    接口返回数据

    1702622585248-0

    查看redis中的数据

    SpringBoot中实现Redis Stream队列的代码实例

    消费者监听消息进行消费

    创建RedisConsumersListener监听器

    import cn.hcr.utils.RedisStreamUtil;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.connection.stream.MapRecord;
    import org.springframework.data.redis.connection.stream.RecordId;
    import org.springframework.data.redis.stream.StreamListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    @Component
    @Slf4j
    @RequiredArgsConstructor
    public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {
    
        public final RedisStreamUtil redisStreamUtil;
    
        /**
         * 监听器
         *
         * @param message
         */
        @Override
        public void onMessage(MapRecord<String, String, String> message) {
            // stream的key值
            String streamKey = message.getStream();
            //消息ID
            RecordId recordId = message.getId();
            //消息内容
            Map<String, String> msg = message.getValue();
            log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);
    
            //处理逻辑
    
            //逻辑处理完成后,ack消息,删除消息,group为消费组名称
            StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);
            xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
            redisStreamUtil.del(streamKey, recordId.getValue());
        }
    }
    

    创建RedisConfig配置类,配置监听

    package cn.hcr.config;
    
    import cn.hcr.listener.RedisConsumersListener;
    import cn.hcr.utils.RedisStreamUtil;
    import com.fasterXML.jackson.annotation.jsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import lombok.extern.slf4j.Slf4j;
    import lombok.var;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuratiphpon;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.stream.Consumer;
    iphpmport org.springframework.data.redis.connection.stream.MapRecord;
    import org.springframework.data.redis.connection.stream.ReadOffset;
    import org.springframework.data.redis.connection.stream.StreamOffset;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    import org.springframework.data.redis.stream.StreamMessageListenerContainer;
    import org.springframework.data.redis.stream.Subscription;
    
    import javax.annotation.Resource;
    import java.time.Duration;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.LinkedblockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Configuration
    @Slf4j
    public class RedisConfig {
    
        @Resource
        private RedisStreamUtil redisStreamUtil;
    
        /**
         * redis序列化
         *
         * @param redisConnectionFactory
         * @return {@code RedisTemplate<String, Object>}
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            template.setConnectionFactory(redisConnectionFactory);
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
            template.setKeySerializer(stringRedisSerializer);
            template.setHashKeySerializer(stringRedisSerializer);
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        @Bean
        public Subscription subscription(Rwww.devze.comedisConnectionFactory factory) {
            AtomicInteger index = new AtomicInteger(1);
            int processors = Runtime.getRuntime().availableProcessors();
            ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(), r -> {
                Thread thread = new Thread(r);
                thread.setName("async-stream-consumer-" + index.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            });
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                    StreamMessageListenerContainer
                            .StreamMessageListenerContainerOptions
                            .builder()
                            // 一次最多获取多少条消息
                            .BATchSize(5)
                            .executor(executor)
                            .pollTimeout(Duration.ofSeconds(1))
                            .errorHandler(throwable -> {
                                log.error("[MQ handler exception]", throwable);
                                throwable.printStackTrace();
                            })
                            .build();
            
            //该key和group可根据需求自定义配置
            String streamName = "mystream";
            String groupname = "mygroup";
    
            initStream(streamName, groupname);
            var listenerContainer = StreamMessageListenerContainer.create(factory, options);
            // 手动ask消息
            Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));
            // 自动ask消息
               /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                        StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
            listenerContainer.start();
            return subscription;
        }
    
        private void initStream(String key, String group) {
            boolean hasKey = redisStreamUtil.hasKey(key);
            if (!hasKey) {
                Map<String, Object> map = new HashMap<>(1);
                map.put("field", "value");
                //创建主题
                String result = redisStreamUtil.addMap(key, map);
            www.devze.com    //创建消费组
                redisStreamUtil.oup(key, group);
                //将初始化的值删除掉
                redisStreamUtil.del(key, result);
                log.info("stream:{}-group:{} initialize success", key, group);
            }
        }
    }

    redisTemplate:该bean用于配置redis序列化

    subscription:配置监听

    initStream:初始化消费组

    监听测试

    使用addMessage()方法投送一条消息后,查看控制台输出信息。

    【streamKey】= mystream,
    【recordId】= 1702623008044-0,
    【msg】=
    {user=[
        "cn.hcr.pojo.RedisUser",
        {"name":"hcr","age":18,"email":"156ef561@gmail.com"}
        ]
    }
    

    总结

    以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。

    Template:该bean用于配置redis序列化

    subscription:配置监听

    initStream:初始化消费组

    到此这篇关于SpringBoot中实现Redis Stream队列的文章就介绍到这了,更多相关SpringBoot实现Redis Stream队列内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号