Go gRPC Tutorial - Service-side Streaming RPC

Preface

The previous article introduced simple mode RPC, which allows us to process and transmit data while using streaming RPC when there is a large amount of data or when data needs to be transmitted continuously.This article begins with a description of service-side streaming RPC.

Server-side streaming RPC: The client sends a request to the server and gets a stream to read the returned message sequence.The client reads the returned stream until there is no message in it.

Scenario simulation: Get stock trends in real time.

1. To get the real-time trend of a crude oil stock, the client sends a request

2. The server returns the stock trend in real time

New proto file

New server_stream.proto file

1. Define Send Information

// Define Send Request Information
message SimpleRequest{
    // Define the parameters to send, using the hump naming style with lower case underlines, such as student_name
    // Request parameters
    string data = 1;
}

2. Define receiving information

// Define streaming response information
message StreamResponse{
    // Streaming response data
    string stream_value = 1;
}

3. Define the service style ListValue

Service-side streaming rpc, simply add streams before responding to data

// Define our services (you can define multiple services, each service can define multiple interfaces)
service StreamServer{
    // Service-side streaming rpc, adding stream before responding to data
    rpc ListValue(SimpleRequest)returns(stream StreamResponse){};
}

4. Compile proto files

Go to the directory where server_stream.proto is located and run the command:

protoc --go_out=plugins=grpc:./ ./simple.proto

Create Server Side

1. Define our services and implement the ListValue method

// SimpleService defines our services
type StreamService struct{}
// ListValue implements the ListValue method
func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
	for n := 0; n < 5; n++ {
		// Sends messages to the stream, defaulting to a maximum message length of `math.MaxInt32`bytes per send
		err := srv.Send(&pb.StreamResponse{
			StreamValue: req.Data + strconv.Itoa(n),
		})
		if err != nil {
			return err
		}
	}
	return nil
}

Beginners may find it confusing how the parameters and return values of ListValue are determined.These are all defined in the.pb.go file that was generated when proto was compiled, and we just need to implement them.

2. Start the gRPC server

const (
	// Address listening address
	Address string = ":8000"
	// Network Work Network Communication Protocol
	Network string = "tcp"
)

func main() {
	// Listen on local ports
	listener, err := net.Listen(Network, Address)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	log.Println(Address + " net.Listing...")
	// New gRPC Server Instance
	// The default maximum message length per receive is `1024*1024*4`bytes(4M) and the maximum message length per send is `math.MaxInt32`bytes
	// grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(1024*1024*4), grpc.MaxSendMsgSize(math.MaxInt32))
	grpcServer := grpc.NewServer()
	// Register our services on the gRPC server
	pb.RegisterStreamServerServer(grpcServer, &StreamService{})

	//Block and wait with the Server () method and our port information area until the process is killed or Stop() is called
	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("grpcServer.Serve err: %v", err)
	}
}

Run Server

go run server.go
:8000 net.Listing...

Create Client End

1. Create a call to the service-side ListValue method

// ListValue calls the ListValue method on the server side
func listValue() {
	// Create Send Structures
	req := pb.SimpleRequest{
		Data: "stream server grpc ",
	}
	// Call our service (ListValue method)
	stream, err := grpcClient.ListValue(context.Background(), &req)
	if err != nil {
		log.Fatalf("Call ListStr err: %v", err)
	}
	for {
		//Recv() method receives server-side messages, defaulting to a maximum message length of `1024*1024*4`bytes(4M) per Recv()
		res, err := stream.Recv()
		// Determine whether the message flow has ended
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("ListStr get stream err: %v", err)
		}
		// Print Return Value
		log.Println(res.StreamValue)
	}
}

2. Start the gRPC client

// Address Connection Address
const Address string = ":8000"

var grpcClient pb.StreamServerClient

func main() {
	// Connect to Server
	conn, err := grpc.Dial(Address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("net.Connect err: %v", err)
	}
	defer conn.Close()

	// Establish gRPC connection
	grpcClient = pb.NewStreamServerClient(conn)
	route()
	listValue()
}

Run Client

go run client.go
stream server grpc 0
stream server grpc 1
stream server grpc 2
stream server grpc 3
stream server grpc 4

Clients continuously obtain data from the server

Reflection

If the server keeps sending data, like getting real-time data on stock trends, can the client stop getting data on its own?

Answer: Yes

1. We'll slightly modify the ListValue method on the server side

// ListValue implements the ListValue method
func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
	for n := 0; n < 15; n++ {
		// Sends messages to the stream, defaulting to a maximum message length of `math.MaxInt32`bytes per send
		err := srv.Send(&pb.StreamResponse{
			StreamValue: req.Data + strconv.Itoa(n),
		})
		if err != nil {
			return err
		}
		log.Println(n)
		time.Sleep(1 * time.Second)
	}
	return nil
}

2. Modify the client's implementation of calling the ListValue method to get the result

// ListValue calls the ListValue method on the server side
func listValue() {
	// Create Send Structures
	req := pb.SimpleRequest{
		Data: "stream server grpc ",
	}
	// Call our service (Route method)
	// A context.Context was also passed in, allowing us to change RPC behavior when needed, such as timeout/canceling a running RPC
	stream, err := grpcClient.ListValue(context.Background(), &req)
	if err != nil {
		log.Fatalf("Call ListStr err: %v", err)
	}
	for {
		//Recv() method receives server-side messages, defaulting to a maximum message length of `1024*1024*4`bytes(4M) per Recv()
		res, err := stream.Recv()
		// Determine whether the message flow has ended
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("ListStr get stream err: %v", err)
		}
		// Print Return Value
		log.Println(res.StreamValue)
		break
	}
	//CloseSend() can be used to close the stream so that the server does not continue to produce stream messages
	//After CloseSend() is called, if Recv() is called again, the stream is reactivated, and the previous result gets the message
	stream.CloseSend()
}

Simply call the CloseSend() method to close the stream on the server and stop it sending data.It is worth noting that after CloseSend() is called, if Recv() is called again, the stream will be reactivated, and the current result will continue to get the message.

This is the perfect solution for client pauses - > continue to get data.

summary

This article describes the simplicity and practicality of streaming RPC on the service side. The client initiates a request and the server keeps returning data until the server stops sending data or the client stops receiving data on its own initiative.The next section describes client-side streaming RPC.

Tutorial source address: https://github.com/Bingjian-Zhu/go-grpc-example
Reference resources: Chinese version of gRPC official document

Tags: network github

Posted on Mon, 13 Apr 2020 19:00:18 -0700 by jkejser