Dapr牵手.NET学习笔记:发布-订阅

发布一下 0 0

queue,是很好的削峰填谷工具,在业内也是主流;发布订阅,可以有效地解耦两个应用,所以dapr把它们进行了有效的封装,我们使用起来更简单高效。

本篇的案例是下完订单后,会把消息发布到redis(当然也可以是其他)中,通知系统和支付系统会订单这个消息,同时,通知系统和支付系统的两个实例中,只会有一个实例接收到这个消息,进行处理,调用示意图如下:

Dapr牵手.NET学习笔记:发布-订阅


项目结构如下:

Dapr牵手.NET学习笔记:发布-订阅

一、配置

用docker-compose部署,docker-compose.yml内容

version: '3.4'services:  #┌────────────────────────────────┐  #│ ordersystem app + Dapr sidecar │  #└────────────────────────────────┘  ordersystem:    image: ${DOCKER_REGISTRY-}ordersystem    depends_on:      - redis      - placement    build:      context: ../      dockerfile: OrderSystem/Dockerfile    ports:      - "3500:3500"    volumes:         - ../OrderSystem:/OrderSystem      networks:      - b2c-dapr  ordersystem-dapr:     image: "daprio/daprd:latest"     command: [ "./daprd", "-app-id", "order", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components"]     depends_on:       - ordersystem     network_mode: "service:ordersystem"     volumes:         - ../components:/components      #┌───────────────────────────────────┐  #│ paymentsystem1 app + Dapr sidecar │  #└───────────────────────────────────┘    paymentsystem1:    image: ${DOCKER_REGISTRY-}paymentsystem    build:      context: ../      dockerfile: PaymentSystem/Dockerfile    ports:      - "3601:3500"    volumes:         - ../PaymentSystem:/PaymentSystem          networks:      - b2c-dapr        paymentsystem1-dapr:     image: "daprio/daprd:latest"     command: [ "./daprd", "-app-id", "pay", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]     depends_on:       - paymentsystem1     network_mode: "service:paymentsystem1"     volumes:         - ../components:/components    #┌───────────────────────────────────┐  #│ paymentsystem2 app + Dapr sidecar │  #└───────────────────────────────────┘     paymentsystem2:    image: ${DOCKER_REGISTRY-}paymentsystem    build:      context: ../      dockerfile: PaymentSystem/Dockerfile    volumes:         - ../PaymentSystem:/PaymentSystem                ports:      - "3602:3500"    networks:      - b2c-dapr        paymentsystem2-dapr:     image: "daprio/daprd:latest"     command: [ "./daprd", "-app-id", "pay", "-app-port", "80" ,"-placement-host-address", "placement:50006","-components-path","/components"]     depends_on:       - paymentsystem2     network_mode: "service:paymentsystem2"     volumes:         - ../components:/components         #┌───────────────────────────────────┐  #│ noticesystem1 app + Dapr sidecar │  #└───────────────────────────────────┘    noticesystem1:    image: ${DOCKER_REGISTRY-}noticesystem    build:      context: ../      dockerfile: NoticeSystem/Dockerfile    ports:      - "3701:3500"    volumes:         - ../NoticeSystem:/NoticeSystem          networks:      - b2c-dapr        noticesystem1-dapr:     image: "daprio/daprd:latest"     command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]     depends_on:       - noticesystem1     network_mode: "service:noticesystem1"     volumes:         - ../components:/components   #┌───────────────────────────────────┐  #│ noticesystem2 app + Dapr sidecar │  #└───────────────────────────────────┘    noticesystem2:    image: ${DOCKER_REGISTRY-}noticesystem    build:      context: ../      dockerfile: NoticeSystem/Dockerfile    ports:      - "3702:3500"    volumes:         - ../NoticeSystem:/NoticeSystem          networks:      - b2c-dapr        noticesystem2-dapr:     image: "daprio/daprd:latest"     command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]     depends_on:       - noticesystem2     network_mode: "service:noticesystem2"     volumes:         - ../components:/components   #┌────────────────────────┐  #│ Dapr placement service │  #└────────────────────────┘    placement:    image: "daprio/dapr"    command: ["./placement", "-port", "50006"]    ports:      - "50006:50006"    networks:      - b2c-dapr  #┌───────────────────┐  #│ Redis state store │  #└───────────────────┘    redis:    image: "redis:latest"    ports:      - "6380:6379"    networks:      - b2c-daprnetworks:    b2c-dapr:

pubsub.yaml(在components文件夹下 )内容是默认,如下

apiVersion: dapr.io/v1alpha1kind: Componentmetadata:  name: pubsubspec:  type: pubsub.redis  version: v1  metadata:  - name: redisHost    value: redis:6379  - name: redisPassword    value: ""

订阅配置文件如下subscription.yaml(在components文件夹下 )

apiVersion: dapr.io/v1alpha1kind: Subscriptionmetadata:  name: myevent-subscriptionspec:  topic: orderComplete  route: /ordercomplete  pubsubname: pubsubscopes:- pay- notice

二、代码

OrderSystem项目的appsettings.json

 "PublishUrl": "http://localhost:3500/v1.0/publish/pubsub/orderComplete"

OrderSystem项目的发布方法

    [HttpGet("/orderpub/{orderno}")]    public async Task<IActionResult> OrderPub(string orderno)    {        try        {            _logger.LogInformation($"Order,publish");            await Task.Delay(400);            var client = _clientFactory.CreateClient();            var stringContent = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(new { OrderNo = orderno, Amount = 30000, OrderTime = DateTime.UtcNow}), System.Text.Encoding.UTF8, "application/json");            _logger.LogInformation(stringContent.ToString());            var content = await client.PostAsync(_publishUrl, stringContent);            return new JsonResult(new { order_result = "Order success,and publish", pay_result = content });        }        catch (Exception exc)        {            _logger.LogCritical(exc, exc.Message);            return new JsonResult(new { order_result = "Order success,and publish,pay exception", message = exc.Message });        }    }

PaymentSystem和NoticeSystem项目中的订阅实现

两个实体类

public class PubBody{    public string id { get; set; }    public string source { get; set; }    public string pubsubname { get; set; }    public string traceid { get; set; }    public PubOrder data { get; set; }    public string specversion { get; set; }    public string datacontenttype { get; set; }    public string type { get; set; }    public string topic { get; set; }}public class PubOrder{    public string OrderNo { get; set; }    public decimal Amount { get; set; }    public DateTime OrderTime { get; set; }}

NoticeSystem和PaymentSystem两个项目中的订阅方法如下

    [HttpPost("/ordercomplete")]    public async Task<IActionResult> OrderComplete()    {        try        {            _logger.LogInformation("PaymentSystem OrderComplete runing……");            using var reader = new StreamReader(Request.Body, System.Text.Encoding.UTF8);            var content = await reader.ReadToEndAsync();            var pubBody = Newtonsoft.Json.JsonConvert.DeserializeObject<PubBody>(content);            _logger.LogInformation($"---------  HostName:{Dns.GetHostName()},OrderNo:{pubBody?.data.OrderNo},OrderAmount:{pubBody?.data.Amount},OrderTime:{pubBody?.data.OrderTime} -----------");            await Task.Delay(200);            _logger.LogInformation($"subscription pay complete");            _logger.LogInformation($"return  SUCCESS");            return new JsonResult(new            {                Status = "SUCCESS"            });        }        catch (Exception exc)        {            _logger.LogCritical(exc, exc.Message);            _logger.LogInformation($"return  RETRY");            return new JsonResult(new            {                Status = "RETRY"            });        }    }

三、发布测试

进入在B2C目发,用命令行启动docker compose

docker-compose up -d

可以测试了,调用OrderSystem的对外地址,下订单NO0001,和NO0002

localhost:3500/v1.0/invoke/order/method/orderpub/NO0001和

localhost:3500/v1.0/invoke/order/method/orderpub/NO0001

查看容器noticesystem1

Dapr牵手.NET学习笔记:发布-订阅

查看容器noticesystem2

Dapr牵手.NET学习笔记:发布-订阅

查看容器paymentsystem1

Dapr牵手.NET学习笔记:发布-订阅

查看容器paymentsystem2

Dapr牵手.NET学习笔记:发布-订阅


NoticeSystem和PaymentSystem同时订阅OrderSystem项目的发布orderComplete,两个实例会轮询处理订阅结果。Dapr就这样,把复杂的发布订阅,封装成一个api一样的简单调用和接收,项目中没有一点的痕迹。

版权声明:内容来源于互联网和用户投稿 如有侵权请联系删除

本文地址:http://0561fc.cn/87703.html