<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Go 語言實現 gRPC 的發布訂閱模式,REST 接口和超時控制

      原文鏈接: 測試小姐姐問我 gRPC 怎么用,我直接把這篇文章甩給了她

      上篇文章 gRPC,爆贊 直接爆了,內容主要包括:簡單的 gRPC 服務,流處理模式,驗證器,Token 認證和證書認證。

      在多個平臺的閱讀量都創了新高,在 oschina 更是獲得了首頁推薦,閱讀量到了 1w+,這已經是我單篇閱讀的高峰了。

      看來只要用心寫還是有收獲的。

      這篇咱們還是從實戰出發,主要介紹 gRPC 的發布訂閱模式,REST 接口和超時控制。

      相關代碼我會都上傳到 GitHub,感興趣的小伙伴可以去查看或下載。

      發布和訂閱模式

      發布訂閱是一個常見的設計模式,開源社區中已經存在很多該模式的實現。其中 docker 項目中提供了一個 pubsub 的極簡實現,下面是基于 pubsub 包實現的本地發布訂閱代碼:

      package main
      
      import (
      	"fmt"
      	"strings"
      	"time"
      
      	"github.com/moby/moby/pkg/pubsub"
      )
      
      func main() {
      	p := pubsub.NewPublisher(100*time.Millisecond, 10)
      
      	golang := p.SubscribeTopic(func(v interface{}) bool {
      		if key, ok := v.(string); ok {
      			if strings.HasPrefix(key, "golang:") {
      				return true
      			}
      		}
      		return false
      	})
      	docker := p.SubscribeTopic(func(v interface{}) bool {
      		if key, ok := v.(string); ok {
      			if strings.HasPrefix(key, "docker:") {
      				return true
      			}
      		}
      		return false
      	})
      
      	go p.Publish("hi")
      	go p.Publish("golang: https://golang.org")
      	go p.Publish("docker: https://www.docker.com/")
      	time.Sleep(1)
      
      	go func() {
      		fmt.Println("golang topic:", <-golang)
      	}()
      	go func() {
      		fmt.Println("docker topic:", <-docker)
      	}()
      
      	<-make(chan bool)
      }
      
      

      這段代碼首先通過 pubsub.NewPublisher 創建了一個對象,然后通過 p.SubscribeTopic 實現訂閱,p.Publish 來發布消息。

      執行效果如下:

      docker topic: docker: https://www.docker.com/
      golang topic: golang: https://golang.org
      fatal error: all goroutines are asleep - deadlock!
      
      goroutine 1 [chan receive]:
      main.main()
      	/Users/zhangyongxin/src/go-example/grpc-example/pubsub/server/pubsub.go:43 +0x1e7
      exit status 2
      

      訂閱消息可以正常打印。

      但有一個死鎖報錯,是因為這條語句 <-make(chan bool) 引起的。但是如果沒有這條語句就不能正常打印訂閱消息。

      這里就不是很懂了,有沒有大佬知道,歡迎留言,求指導。

      接下來就用 gRPC 和 pubsub 包實現發布訂閱模式。

      需要實現四個部分:

      1. proto 文件;
      2. 服務端: 用于接收訂閱請求,同時也接收發布請求,并將發布請求轉發給訂閱者;
      3. 訂閱客戶端: 用于從服務端訂閱消息,處理消息;
      4. 發布客戶端: 用于向服務端發送消息。

      proto 文件

      首先定義 proto 文件:

      syntax = "proto3";
      
      package proto;
       
      message String {
          string value = 1;
      }
       
      service PubsubService {
          rpc Publish (String) returns (String);
          rpc SubscribeTopic (String) returns (stream String);
          rpc Subscribe (String) returns (stream String);
      }
      

      定義三個方法,分別是一個發布 Publish 和兩個訂閱 SubscribeSubscribeTopic

      Subscribe 方法接收全部消息,而 SubscribeTopic 根據特定的 Topic 接收消息。

      服務端

      package main
      
      import (
      	"context"
      	"fmt"
      	"log"
      	"net"
      	"server/proto"
      	"strings"
      	"time"
      
      	"github.com/moby/moby/pkg/pubsub"
      	"google.golang.org/grpc"
      	"google.golang.org/grpc/reflection"
      )
      
      type PubsubService struct {
      	pub *pubsub.Publisher
      }
      
      func (p *PubsubService) Publish(ctx context.Context, arg *proto.String) (*proto.String, error) {
      	p.pub.Publish(arg.GetValue())
      	return &proto.String{}, nil
      }
      
      func (p *PubsubService) SubscribeTopic(arg *proto.String, stream proto.PubsubService_SubscribeTopicServer) error {
      	ch := p.pub.SubscribeTopic(func(v interface{}) bool {
      		if key, ok := v.(string); ok {
      			if strings.HasPrefix(key, arg.GetValue()) {
      				return true
      			}
      		}
      		return false
      	})
      
      	for v := range ch {
      		if err := stream.Send(&proto.String{Value: v.(string)}); nil != err {
      			return err
      		}
      	}
      	return nil
      }
      
      func (p *PubsubService) Subscribe(arg *proto.String, stream proto.PubsubService_SubscribeServer) error {
      	ch := p.pub.Subscribe()
      
      	for v := range ch {
      		if err := stream.Send(&proto.String{Value: v.(string)}); nil != err {
      			return err
      		}
      	}
      	return nil
      }
      
      func NewPubsubService() *PubsubService {
      	return &PubsubService{pub: pubsub.NewPublisher(100*time.Millisecond, 10)}
      }
      
      func main() {
      	lis, err := net.Listen("tcp", ":50051")
      	if err != nil {
      		log.Fatalf("failed to listen: %v", err)
      	}
      
      	// 簡單調用
      	server := grpc.NewServer()
      	// 注冊 grpcurl 所需的 reflection 服務
      	reflection.Register(server)
      	// 注冊業務服務
      	proto.RegisterPubsubServiceServer(server, NewPubsubService())
      
      	fmt.Println("grpc server start ...")
      	if err := server.Serve(lis); err != nil {
      		log.Fatalf("failed to serve: %v", err)
      	}
      }
      

      對比之前的發布訂閱程序,其實這里是將 *pubsub.Publisher 作為了 gRPC 的結構體 PubsubService 的一個成員。

      然后還是按照 gRPC 的開發流程,實現結構體對應的三個方法。

      最后,在注冊服務時,將 NewPubsubService() 服務注入,實現本地發布訂閱功能。

      訂閱客戶端

      package main
      
      import (
      	"client/proto"
      	"context"
      	"fmt"
      	"io"
      	"log"
      
      	"google.golang.org/grpc"
      )
      
      func main() {
      	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
      	if err != nil {
      		log.Fatal(err)
      	}
      	defer conn.Close()
      
      	client := proto.NewPubsubServiceClient(conn)
      	stream, err := client.Subscribe(
      		context.Background(), &proto.String{Value: "golang:"},
      	)
      	if nil != err {
      		log.Fatal(err)
      	}
      
      	go func() {
      		for {
      			reply, err := stream.Recv()
      			if nil != err {
      				if io.EOF == err {
      					break
      				}
      				log.Fatal(err)
      			}
      			fmt.Println("sub1: ", reply.GetValue())
      		}
      	}()
      
      	streamTopic, err := client.SubscribeTopic(
      		context.Background(), &proto.String{Value: "golang:"},
      	)
      	if nil != err {
      		log.Fatal(err)
      	}
      
      	go func() {
      		for {
      			reply, err := streamTopic.Recv()
      			if nil != err {
      				if io.EOF == err {
      					break
      				}
      				log.Fatal(err)
      			}
      			fmt.Println("subTopic: ", reply.GetValue())
      		}
      	}()
      
      	<-make(chan bool)
      }
      

      新建一個 NewPubsubServiceClient 對象,然后分別實現 client.Subscribeclient.SubscribeTopic 方法,再通過 goroutine 不停接收消息。

      發布客戶端

      package main
      
      import (
      	"client/proto"
      	"context"
      	"log"
      
      	"google.golang.org/grpc"
      )
      
      func main() {
      	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
      	if err != nil {
      		log.Fatal(err)
      	}
      	defer conn.Close()
      	client := proto.NewPubsubServiceClient(conn)
      
      	_, err = client.Publish(
      		context.Background(), &proto.String{Value: "golang: hello Go"},
      	)
      	if err != nil {
      		log.Fatal(err)
      	}
      
      	_, err = client.Publish(
      		context.Background(), &proto.String{Value: "docker: hello Docker"},
      	)
      	if nil != err {
      		log.Fatal(err)
      	}
      
      }
      

      新建一個 NewPubsubServiceClient 對象,然后通過 client.Publish 方法發布消息。

      當代碼全部寫好之后,我們開三個終端來測試一下:

      終端1 上啟動服務端:

      go run main.go
      

      終端2 上啟動訂閱客戶端:

      go run sub_client.go
      

      終端3 上執行發布客戶端:

      go run pub_client.go
      

      這樣,在 終端2 上就有對應的輸出了:

      subTopic:  golang: hello Go
      sub1:  golang: hello Go
      sub1:  docker: hello Docker
      

      也可以再多開幾個訂閱終端,那么每一個訂閱終端上都會有相同的內容輸出。

      源碼地址: GitHub

      REST 接口

      gRPC 一般用于集群內部通信,如果需要對外提供服務,大部分都是通過 REST 接口的方式。開源項目 grpc-gateway 提供了將 gRPC 服務轉換成 REST 服務的能力,通過這種方式,就可以直接訪問 gRPC API 了。

      但我覺得,實際上這么用的應該還是比較少的。如果提供 REST 接口的話,直接寫一個 HTTP 服務會方便很多。

      proto 文件

      第一步還是創建一個 proto 文件:

      syntax = "proto3";
      
      package proto;
      
      import "google/api/annotations.proto";
      
      message StringMessage {
        string value = 1;
      }
      
      service RestService {
          rpc Get(StringMessage) returns (StringMessage) {
              option (google.api.http) = {
                  get: "/get/{value}"
              };
          }
          rpc Post(StringMessage) returns (StringMessage) {
              option (google.api.http) = {
                  post: "/post"
                  body: "*"
              };
          }
      }
      

      定義一個 REST 服務 RestService,分別實現 GETPOST 方法。

      安裝插件:

      go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
      

      生成對應代碼:

      protoc -I/usr/local/include -I. \
          -I$GOPATH/pkg/mod \
          -I$GOPATH/pkg/mod/github.com/grpc-ecosystem/grpc-gateway@v1.16.0/third_party/googleapis \
          --grpc-gateway_out=. --go_out=plugins=grpc:.\
          --swagger_out=. \
          helloworld.proto
      

      --grpc-gateway_out 參數可生成對應的 gw 文件,--swagger_out 參數可生成對應的 API 文檔。

      在我這里生成的兩個文件如下:

      helloworld.pb.gw.go
      helloworld.swagger.json
      

      REST 服務

      package main
      
      import (
      	"context"
      	"log"
      	"net/http"
      
      	"rest/proto"
      
      	"github.com/grpc-ecosystem/grpc-gateway/runtime"
      	"google.golang.org/grpc"
      )
      
      func main() {
      	ctx := context.Background()
      	ctx, cancel := context.WithCancel(ctx)
      	defer cancel()
      
      	mux := runtime.NewServeMux()
      
      	err := proto.RegisterRestServiceHandlerFromEndpoint(
      		ctx, mux, "localhost:50051",
      		[]grpc.DialOption{grpc.WithInsecure()},
      	)
      	if err != nil {
      		log.Fatal(err)
      	}
      
      	http.ListenAndServe(":8080", mux)
      }
      

      這里主要是通過實現 gw 文件中的 RegisterRestServiceHandlerFromEndpoint 方法來連接 gRPC 服務。

      gRPC 服務

      package main
      
      import (
      	"context"
      	"net"
      
      	"rest/proto"
      
      	"google.golang.org/grpc"
      )
      
      type RestServiceImpl struct{}
      
      func (r *RestServiceImpl) Get(ctx context.Context, message *proto.StringMessage) (*proto.StringMessage, error) {
      	return &proto.StringMessage{Value: "Get hi:" + message.Value + "#"}, nil
      }
      
      func (r *RestServiceImpl) Post(ctx context.Context, message *proto.StringMessage) (*proto.StringMessage, error) {
      	return &proto.StringMessage{Value: "Post hi:" + message.Value + "@"}, nil
      }
      
      func main() {
      	grpcServer := grpc.NewServer()
      	proto.RegisterRestServiceServer(grpcServer, new(RestServiceImpl))
      	lis, _ := net.Listen("tcp", ":50051")
      	grpcServer.Serve(lis)
      }
      

      gRPC 服務的實現方式還是和以前一樣。

      以上就是全部代碼,現在來測試一下:

      啟動三個終端:

      終端1 啟動 gRPC 服務:

      go run grpc_service.go
      

      終端2 啟動 REST 服務:

      go run rest_service.go
      

      終端3 來請求 REST 服務:

      $ curl localhost:8080/get/gopher
      {"value":"Get hi:gopher"}
      
      $ curl localhost:8080/post -X POST --data '{"value":"grpc"}'
      {"value":"Post hi:grpc"}
      

      源碼地址: GitHub

      超時控制

      最后一部分介紹一下超時控制,這部分內容是非常重要的。

      一般的 WEB 服務 API,或者是 Nginx 都會設置一個超時時間,超過這個時間,如果還沒有數據返回,服務端可能直接返回一個超時錯誤,或者客戶端也可能結束這個連接。

      如果沒有這個超時時間,那是相當危險的。所有請求都阻塞在服務端,會消耗大量資源,比如內存。如果資源耗盡的話,甚至可能會導致整個服務崩潰。

      那么,在 gRPC 中怎么設置超時時間呢?主要是通過上下文 context.Context 參數,具體來說就是 context.WithDeadline 函數。

      proto 文件

      創建最簡單的 proto 文件,這個不多說。

      syntax = "proto3";
      
      package proto;
      
      // The greeting service definition.
      service Greeter {
          // Sends a greeting
          rpc SayHello (HelloRequest) returns (HelloReply) {}
      }
      
      // The request message containing the user's name.
      message HelloRequest {
          string name = 1;
      }
      
      // The response message containing the greetings
      message HelloReply {
          string message = 1;
      }
      

      客戶端

      package main
      
      import (
      	"client/proto"
      	"context"
      	"fmt"
      	"log"
      	"time"
      
      	"google.golang.org/grpc"
      	"google.golang.org/grpc/codes"
      	"google.golang.org/grpc/status"
      )
      
      func main() {
      	// 簡單調用
      	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
      	defer conn.Close()
      
      	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(3*time.Second)))
      	defer cancel()
      
      	client := proto.NewGreeterClient(conn)
      	// 簡單調用
      	reply, err := client.SayHello(ctx, &proto.HelloRequest{Name: "zzz"})
      	if err != nil {
      		statusErr, ok := status.FromError(err)
      		if ok {
      			if statusErr.Code() == codes.DeadlineExceeded {
      				log.Fatalln("client.SayHello err: deadline")
      			}
      		}
      
      		log.Fatalf("client.SayHello err: %v", err)
      	}
      	fmt.Println(reply.Message)
      }
      

      通過下面的函數設置一個 3s 的超時時間:

      ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(3*time.Second)))
      defer cancel()
      

      然后在響應錯誤中對超時錯誤進行檢測。

      服務端

      package main
      
      import (
      	"context"
      	"fmt"
      	"log"
      	"net"
      	"runtime"
      	"server/proto"
      	"time"
      
      	"google.golang.org/grpc"
      	"google.golang.org/grpc/codes"
      	"google.golang.org/grpc/reflection"
      	"google.golang.org/grpc/status"
      )
      
      type greeter struct {
      }
      
      func (*greeter) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {
      	data := make(chan *proto.HelloReply, 1)
      	go handle(ctx, req, data)
      	select {
      	case res := <-data:
      		return res, nil
      	case <-ctx.Done():
      		return nil, status.Errorf(codes.Canceled, "Client cancelled, abandoning.")
      	}
      }
      
      func handle(ctx context.Context, req *proto.HelloRequest, data chan<- *proto.HelloReply) {
      	select {
      	case <-ctx.Done():
      		log.Println(ctx.Err())
      		runtime.Goexit() //超時后退出該Go協程
      	case <-time.After(4 * time.Second): // 模擬耗時操作
      		res := proto.HelloReply{
      			Message: "hello " + req.Name,
      		}
      		// //修改數據庫前進行超時判斷
      		// if ctx.Err() == context.Canceled{
      		// 	...
      		// 	//如果已經超時,則退出
      		// }
      		data <- &res
      	}
      }
      
      func main() {
      	lis, err := net.Listen("tcp", ":50051")
      	if err != nil {
      		log.Fatalf("failed to listen: %v", err)
      	}
      
      	// 簡單調用
      	server := grpc.NewServer()
      	// 注冊 grpcurl 所需的 reflection 服務
      	reflection.Register(server)
      	// 注冊業務服務
      	proto.RegisterGreeterServer(server, &greeter{})
      
      	fmt.Println("grpc server start ...")
      	if err := server.Serve(lis); err != nil {
      		log.Fatalf("failed to serve: %v", err)
      	}
      }
      

      服務端增加一個 handle 函數,其中 case <-time.After(4 * time.Second) 表示 4s 之后才會執行其對應代碼,用來模擬超時請求。

      如果客戶端超時時間超過 4s 的話,就會產生超時報錯。

      下面來模擬一下:

      服務端:

      $ go run main.go
      grpc server start ...
      2021/10/24 22:57:40 context deadline exceeded
      

      客戶端:

      $ go run main.go
      2021/10/24 22:57:40 client.SayHello err: deadline
      exit status 1
      

      源碼地址: GitHub

      總結

      本文主要介紹了 gRPC 的三部分實戰內容,分別是:

      1. 發布訂閱模式
      2. REST 接口
      3. 超時控制

      個人感覺,超時控制還是最重要的,在平時的開發過程中需要多多注意。

      結合上篇文章,gRPC 的實戰內容就寫完了,代碼全部可以執行,也都上傳到了 GitHub

      大家如果有任何疑問,歡迎給我留言,如果感覺不錯的話,也歡迎關注和轉發。


      源碼地址:

      推薦閱讀:

      參考:

      posted @ 2021-10-27 09:34  yongxinz  閱讀(847)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲欧美日韩综合一区在线| 视频一区二区 国产视频| √天堂中文在线最新版| 亚洲天堂一区二区成人在线| 免费乱理伦片在线观看| 女人腿张开让男人桶爽| 一边吃奶一边摸做爽视频| 亚洲天堂成人黄色在线播放| 一区二区三区国产不卡| 中文国产不卡一区二区| 精品国产乱码久久久久APP下载| 国内揄拍国内精品人妻| 日韩熟妇中文色在线视频| 国产日韩精品一区二区在线观看播放 | 国产玖玖玖玖精品电影| 中西区| 亚洲激情国产一区二区三区| 国产一区二区波多野结衣| 国产精品SM捆绑调教视频| 国产成人高清精品免费软件| 亚洲自偷自偷在线成人网站传媒| 啊灬啊灬啊灬快灬高潮了电影片段 | 人人人澡人人肉久久精品| 国产L精品国产亚洲区在线观看| 欧美日韩国产综合草草| 久久一本人碰碰人碰| 国产成人精品一区二区秒拍1o| 亚洲欧洲美洲无码精品va| 九九热视频精品在线播放| 亚洲乱理伦片在线观看中字| 亚洲AV成人无码精品电影在线| 国产稚嫩高中生呻吟激情在线视频 | 一区二区三区岛国av毛片| 精品国产精品中文字幕| 秋霞鲁丝片成人无码| 亚洲国产一区二区精品专| 武装少女在线观看高清完整版免费 | 91区国产福利在线观看午夜| 99riav精品免费视频观看| 成人乱码一区二区三区av| 亚洲熟妇av综合一区二区|