spring boot 集成Rsocket,服务端、客户端

一、简介

RSocket 是一种二进制协议,可用于 TCP、WebSockets 和 Aeron 等字节流传输的应用协议,具有以下交互模型:

1、Request-Response: 发送一条信息,接收一条信息。

2、Request-Stream: 发送一条消息并接收返回的消息流。

3、Channel: 双向发送消息流。

4、Fire-and-Forget: 发送单向消息。
二、服务端代码

1、安装依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

2、配置文件添加如下:

spring:
  rsocket:
    server:
      port: 9898
      transport: tcp

3、服务端测试代码

package com.example.rsocketservice.controller;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Random;

@RestController
public class SendController {

    //Request-Response模式
    @MessageMapping("message")
    public Mono<String> handleMessage(Mono<String> message) {
        return message.doOnNext(msg -> {
            System.out.printf("接收到消息:%s%n", msg) ;
        }).map(msg -> "服务器成功收到了你的消息!!!") ;
    }

    //Request-Stream模式
    // 必须返回Flux
    @MessageMapping("stream")
    public Flux<String> handleStream() {
        return Flux
                .interval(Duration.ofSeconds(2))
                // 随机生成
                .map(i -> String.valueOf(new Random().nextInt(10000000)))
                // 只在此通道中获取10个值
                .take(10)
                .doOnComplete(() -> {
                    System.out.println("completed...") ;
                }) ;
    }

    //Channel模式
    @MessageMapping("channel")
    public Flux<String> handleChannel(Flux<String> datas) {
        return datas.doOnNext(ret -> {
            System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;
        }).map(ret -> {
            return ret + " - " + new Random().nextInt(1000) ;
        }) ;
    }

    //Fire-and-Forget模式
    @MessageMapping("faf")
    public Mono<Void> handleFireAndForget(Mono<String> data) {
        return data.doOnNext(ret -> {
            System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;
        }).then() ;
    }
}

三、客户端测试代码

1、安装依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

2、新建配置类ClientConfiguration

package com.example.rsocketclient.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;


@Configuration
public class ClientConfiguration {
    @Bean
    RSocketRequester rSocketRequester(/*RSocketStrategies rSocketStrategies*/) {
        RSocketStrategies strategies = RSocketStrategies.builder()
//                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
//                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
                .build();

        RSocketRequester requester = RSocketRequester.builder()
                .rsocketStrategies(strategies)
                .tcp("localhost", 9898);

        return requester;
    }
}

3、测试代码

package com.example.rsocketclient.controller;

import jakarta.annotation.Resource;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Random;

@RestController
public class TestController {
    @Resource
    private RSocketRequester rsocketRequester;


    //Request-Response模式
    @GetMapping("/message/{body}")
    // Request-Response 发送一条信息,接收一条信息。
    public void sendMessage(@PathVariable("body") String body) {
        this.rsocketRequester
                .route("message")
                .data(body)
                .retrieveMono(String.class)
                .subscribe(System.out::println) ;
    }

    //Request-Stream模式
    @GetMapping("stream")
    public void sendStream() {
        this.rsocketRequester
                .route("stream")
                .retrieveFlux(String.class)
                .subscribe(ret -> {
                    System.out.printf("%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;
                }) ;
    }

    @GetMapping("channel")
    // Channel 双向发送消息流。
    public void sendChannel() {
        this.rsocketRequester
                .route("channel")
                .data(Flux.just("1", "2", "3", "4", "5", "6").delayElements(Duration.ofSeconds(1)))
                .retrieveFlux(String.class)
                .subscribe(ret -> {
                    System.out.printf("【client】%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;
                }) ;
    }

    @GetMapping("sendFireAndForget")
    // Fire-and-Forget 发送单向消息。
    public void sendFireAndForget() {
        this.rsocketRequester
                .route("faf")
                .data(Mono.just(String.valueOf(new Random().nextInt(1000))))
                .send()
                .subscribe() ;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/773439.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue配置sql规则

vue配置sql规则 实现效果组件完整代码父组件 前端页面实现动态配置sql条件&#xff0c;将JSON结构给到后端&#xff0c;后端进行sql组装。 这里涉及的分组后端在组装时用括号将这块规则括起来就行&#xff0c;分组的sql连接符&#xff08;并且/或者&#xff09;取组里的第一个。…

论文配色:跟着顶刊学配色(Nature篇)

写在前面&#xff1a; 截至目前&#xff0c;nature共发表Article 572篇&#xff0c;本文挑选了部分最新的文献&#xff0c;进行配色总结&#xff0c;每种颜色分别提供十六进制、RGB、HSB、CMYK和LAB5种描述模型&#xff0c;方便后期配色使用。 三色&#xff1a; 四色&#xff…

Java增加线程后kafka仍然消费很慢

文章目录 一、问题分析二、控制kafka消费速度属性三、案例描述 一、问题分析 Java增加线程通常是为了提高程序的并发处理能力&#xff0c;但如果Kafka仍然消费很慢&#xff0c;可能的原因有&#xff1a; 网络延迟较大&#xff1a;如果网络延迟较大&#xff0c;即使开启了多线…

【MindSpore学习打卡】应用实践-计算机视觉-SSD目标检测:从理论到实现

在计算机视觉领域&#xff0c;目标检测是一个至关重要的任务。它不仅要求识别图像中的目标物体&#xff0c;还需要精确定位这些物体的位置。近年来&#xff0c;随着深度学习技术的飞速发展&#xff0c;各种高效的目标检测算法层出不穷。SSD&#xff08;Single Shot MultiBox De…

推动高效能:东芝TB67H301FTG全桥直流电机驱动IC

在如今高度自动化的时代&#xff0c;电子产品的性能和效率成为了工程师们关注的焦点。东芝的TB67H301FTG全桥直流电机驱动IC应运而生&#xff0c;以其卓越的技术和可靠性&#xff0c;成为众多应用的理想选择。无论是在机器人、家用电器、工业自动化&#xff0c;还是在其他需要精…

企业怎么选购USB Server?先看这条!

一、首先&#xff0c;USB Server是什么&#xff1f; USB Server&#xff1f;听起来像是个高科技玩意儿&#xff01; 其实&#xff0c;它就是个很多企业都在用的远程“传送门”&#xff0c;把USB设备都固定插在USB Server上&#xff0c;然后将USB Server与计算机网络连接&…

LaTeX表格灵活设置列宽

一些基本的插入表格的操作见&#xff1a;https://blog.csdn.net/gsgbgxp/article/details/129457872 遇到问题先查阅《IShort》和刘海洋老师的《LaTeX入门》。 设置表格列宽基础操作&#xff08;不借助tabularx&#xff09; 先从一个简单表格开始 \begin{table}[!h]\centeri…

Python基础小知识问答系列-过滤列表元素

1. 问题&#xff1a; 如何根据单一条件过滤列表的元素&#xff1f; 如何根据复杂条件过滤列表的元素&#xff1f; 2. 解决方式&#xff1a; 可以使用推导式生成器&#xff0c;进行单一条件的列表元素过滤&#xff0c;尤其是列表内容较多时; 也可以使用filter函数进行列…

怎么看一家TPM管理咨询公司专不专业

在评估一家TPM管理咨询公司是否专业时&#xff0c;我们需要从多个维度进行深入的考量。TPM作为一种以提升设备综合效率为目标&#xff0c;以全系统的预防维修为过程&#xff0c;以全体人员参与为基础的设备保养和维修管理体系&#xff0c;其实施的成功与否直接关系到企业的生产…

二二复制模式,发展下属并形成一个销售网络体系来实现收入增长!

二二复制模式&#xff0c;又称为双轨制&#xff0c;是一种直销理念的营销模式&#xff0c;其核心在于通过发展下属并形成一个销售网络体系来实现收入增长。以下是对二二复制模式的详细讲解&#xff0c;包括其优势和玩法介绍&#xff0c;以及适合的行业。 一、二二复制模式的定…

刚办理的手机号被停用,你可能遇到这些问题了!

很多朋友都会遇到手机号被停用的情况&#xff0c;那么你知道你的手机号为什么会被停用吗&#xff1f;接下来&#xff0c;关于手机号被停用的问题&#xff0c;跟着小编一块来了解一下吧。 ​停机的两种形态&#xff1a; 1、第一个是局方停机&#xff0c;即语音、短信和流量都不…

opencv实现人脸检测功能----20240704

opencv实现人脸检测 早在 2017 年 8 月,OpenCV 3.3 正式发布,带来了高度改进的“深度神经网络”(dnn)模块。 该模块支持多种深度学习框架,包括 Caffe、TensorFlow 和 Torch/PyTorch。OpenCV 的官方版本中包含了一个更准确、基于深度学习的人脸检测器, 链接:基于深度学习…

Day04-SonarQube

Day04-SonarQube 1.SonarQube基本概述1.1 什么是SonarQube1.2 使用SonarQube前提环境要求 2. SonarQube服务安装-8.9 lts (PostgreSQL)2.1 环境准备2.2 安装Sonarqube依赖工具 -PSQL 2.SonarQube服务安装-7.7 (MySQL)故障与排查 3.Sonarqube插件管理4. 创建项目及分析1) 分析ja…

简历–求职信–通用

每个毕业生的简历首页大概都会是一封求职信。如果说对求职者的简历正文我们只是浮光掠影看上几眼的话&#xff0c;那么对求职信&#xff0c;简直连浮光掠影都称不上。说实话&#xff0c;我在看求职者简历的时候一般会把这一页翻过去&#xff0c;很少去看。为什么呢&#xff1f;…

springboot宠物领养系统-计算机毕业设计源码08373

目 录 摘要 1 绪论 1.1选题依据 1.2国内外研究现状 1.3相关技术介绍 1.4论文结构与章节安排 2 基于springboot宠物领养系统系统分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2经济可行性分析 2.1.3操作可行性分析 2.2 系统功能分析 2.2.1 功能性分析 2.2.2 非功…

java 常见错误问题

1.java中datetime数据类型如何定义 在Java中&#xff0c;可以使用java.time包中的DateTime类来定义DateTime数据类型。 要定义DateTime数据类型&#xff0c;你可以使用以下代码&#xff1a; public static void test() {// 获取当前日期和时间LocalDateTime datetime Local…

如何利用AI撰写短文案获客?分享6大平台和3大步骤!

从去年开始&#xff0c;很多大厂都在裁员&#xff0c;原因就是因为AI的火爆&#xff0c;替代了很多机械式的劳动力。以前很多人可以通过机械式的工作来摸鱼&#xff0c;现在AI完成的效率比人工的要高很多倍。 国内好用的AI平台非常多&#xff0c;有时候也可以使用几个AI平台结合…

RAG 工业落地方案框架(Qanything、RAGFlow、FastGPT、智谱RAG)细节比对!CVPR自动驾驶最in挑战赛赛道,全球冠军被算力选手夺走了

RAG 工业落地方案框架&#xff08;Qanything、RAGFlow、FastGPT、智谱RAG&#xff09;细节比对&#xff01;CVPR自动驾驶最in挑战赛赛道&#xff0c;全球冠军被算力选手夺走了。 本文详细比较了四种 RAG 工业落地方案 ——Qanything、RAGFlow、FastGPT 和智谱 RAG&#xff0c;重…

后端之路——最规范、便捷的spring boot工程配置

一、参数配置化 上一篇我们学了阿里云OSS的使用&#xff0c;那么我们为了方便使用OSS来上传文件&#xff0c;就创建了一个【util】类&#xff0c;里面有一个【AliOSSUtils】类&#xff0c;虽然本人觉得没啥不方便的&#xff0c;但是黑马视频又说这样还是存在不便维护和管理问题…

Artificial Intelligence Self-study

Artificial Intelligence Self-study Traditional AI (Symbolic AI) 基于&#xff1a;符号表示 数理逻辑 搜索 - 有明确规则&#xff0c;依靠算力。Appliance &#xff1a; 数学难题(Heuristic Algorithm)&#xff0c;棋牌对抗(围棋)&#xff0c;专家系统(输入病症&#xf…