RPC principle and GRPC details

1, RPC principle

1. RPC framework origin

As the volume of single application becomes larger and larger, and the code is not easy to maintain and manage, a microservice architecture is generated, which is divided into independent services according to public or functional modules, and then independent services can be called each other.

How to call each other between microservices?
First, we need to solve the following five problems:
1. How to define the syntax of a remote call?
2. How do I pass parameters?
3. How to represent data?
4. How to know which remote calls a server has implemented? From which port can I access this remote call?
5. What to do if there are errors, retransmission, packet loss, performance and other problems?

You may have written about socket or http communication, simple client access to server mode, and think that through this, you can solve the mutual call between services, but considering the above five problems, it is not so easy to deal with, and it is not the work that individuals can complete.

So the RPC framework was born. Let's not worry about the underlying implementation, simple and easy to use:

2. Principles of RPC framework

When the client application wants to make a remote call, it actually calls the client's stub. It is responsible for encoding the interface, method and parameter of the call through the agreed protocol specification, and transmitting the call network package to the server through the local rpcruntime. The server RPCRuntime receives the request, then sends it to the server side Stub to decode, then calls the server side method, the server executes the method, returns the result, the server Stub will send the result to the client after encoding, sends the RPCRuntime to the client, receives the result to the client's Stub decoding, obtains the result, returns to the client.

1. For the client, these processes are transparent, just like local calls; for the server, it is OK to focus on the processing of business logic.
2. For Stub layer, it deals with the agreed syntax, semantics, encapsulation and de encapsulation.
3. For RPCRuntime, it mainly deals with high-performance transmission, as well as network errors and exceptions.

Let's see how the RPC framework solves the above five problems: 1, 2 and 3 problems can be solved by the Stub layer, 4 problems can be solved by service registration and publishing, and 5 problems can be solved by RPCRuntime.

2, GRPC principle

gRPC is a high performance, open source and general RPC framework, which is designed for mobile and HTTP/2. Currently, C, Java and Go language versions are provided, respectively: gRPC, gRPC Java, gRPC Go. Among them, C version supports C, C++, Node.js, Python, Ruby, Objective-C, PHP and C ා.

This article is explained in GO language version
1. Gorang installing GRPC
Detailed installation connection
2. Principle of protocol buffer
Article 1
Article 2
3. Grpc go GitHub address:
github warehouse
godoc document

The protocol of Stub layer of grpc go is to use. proto file to define service interface, parameters, etc., and use the tool protoc Gen go to generate the cross reference table shared by the client and the server. If you want to generate any language file, you can use the corresponding plug-ins, so cross language is realized.
The command to generate GO language file is as follows:
protoc --go_out=plugins=grpc:. *.proto
The gRPC RPCRuntime layer is designed based on HTTP/2, which brings features such as two-way flow, flow control, header compression, and multiple multiplexing requests over a single TCP connection.

GRPC Server start

1. Overall startup process

func main() {
    //Analyze operation parameters
    flag.Parse()
    //Configure listening protocol, address and port
    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    //grpc's additional service configuration, which mainly requires no encryption
    var opts []grpc.ServerOption
    if *tls {
        if *certFile == "" {
            *certFile = testdata.Path("server1.pem")
        }
        if *keyFile == "" {
            *keyFile = testdata.Path("server1.key")
        }
        creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
        if err != nil {
            log.Fatalf("Failed to generate credentials %v", err)
        }
        opts = []grpc.ServerOption{grpc.Creds(creds)}
    }
    //Initialization of grpc service, binding some configuration parameters
    grpcServer := grpc.NewServer(opts...)
    //Register the interface API implementation defined in. proto file to grpc service, which is convenient to call
    pb.RegisterRouteGuideServer(grpcServer, newServer())
    //grpc service started, listening started
    grpcServer.Serve(lis)
}

2. Serve function

The key process is a for loop. If Accept() returns an error, and the error is temporary, there will be retries, and the retry time will double by 5ms until 1s.

for {
        rawConn, err := lis.Accept()
        //error handling
        if err != nil {
            if ne, ok := err.(interface {
                Temporary() bool
            }); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                s.mu.Lock()
                s.printf("Accept error: %v; retrying in %v", err, tempDelay)
                s.mu.Unlock()
                timer := time.NewTimer(tempDelay)
                select {
                case <-timer.C:
                case <-s.quit.Done():
                    timer.Stop()
                    return nil
                }
                continue
            }
            s.mu.Lock()
            s.printf("done serving; Accept = %v", err)
            s.mu.Unlock()

            if s.quit.HasFired() {
                return nil
            }
            return err
        }
        tempDelay = 0
        // Start a new goroutine to deal with rawConn so we don't stall this Accept
        // loop goroutine.
        //
        // Make sure we account for the goroutine so GracefulStop doesn't nil out
        // s.conns before this conn can be added.
        s.serveWG.Add(1)
        //Restart a goroutine connection to handle accept
        go func() {
            s.handleRawConn(rawConn)
            s.serveWG.Done()
        }()
    }

3. handleRawConn function

The main function is to obtain a Transport of the server and open a goroutine waiting stream, which involves calling the registration method.

st := s.newHTTP2Transport(conn, authInfo)
    if st == nil {
        return
    }

    rawConn.SetDeadline(time.Time{})
    if !s.addConn(st) {
        return
    }
    go func() {
        s.serveStreams(st)
        s.removeConn(st)
    }()

GRPC Client start

1. Establish the interface of connection and binding implementation

//Analyze operation parameters
    flag.Parse()
    //Some configuration of connection, mainly encryption, security and blocking
    var opts []grpc.DialOption
    if *tls {
        if *caFile == "" {
            *caFile = testdata.Path("ca.pem")
        }
        creds, err := credentials.NewClientTLSFromFile(*caFile, *serverHostOverride)
        if err != nil {
            log.Fatalf("Failed to create TLS credentials %v", err)
        }
        opts = append(opts, grpc.WithTransportCredentials(creds))
    } else {
        opts = append(opts, grpc.WithInsecure())
    }

    opts = append(opts, grpc.WithBlock())
    //Establish a connection
    conn, err := grpc.Dial(*serverAddr, opts...)
    if err != nil {
        log.Fatalf("fail to dial: %v", err)
    }
    defer conn.Close()
    
    //Create a Client that implements the interface API defined by the. proto file
    client := pb.NewRouteGuideClient(conn)

2. Client call mode

Unary RPC: unary RPC

func (c *routeGuideClient) GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error) {
    out := new(Feature)
    err := c.cc.Invoke(ctx, "/routeguide.RouteGuide/GetFeature", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

// printFeature gets the feature for the given point.
func printFeature(client pb.RouteGuideClient, point *pb.Point) {
    log.Printf("Getting feature for point (%d, %d)", point.Latitude, point.Longitude)
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    feature, err := client.GetFeature(ctx, point)
    if err != nil {
        log.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err)
    }
    log.Println(feature)
}

// GetFeature returns the feature at the given point.
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
    for _, feature := range s.savedFeatures {
        if proto.Equal(feature.Location, point) {
            return feature, nil
        }
    }
    // No feature was found, return an unnamed feature
    return &pb.Feature{Location: point}, nil
}

Server side streaming RPC: server side streaming RPC

func (c *routeGuideClient) ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error) {
    stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[0], "/routeguide.RouteGuide/ListFeatures", opts...)
    if err != nil {
        return nil, err
    }
    x := &routeGuideListFeaturesClient{stream}
    if err := x.ClientStream.SendMsg(in); err != nil {
        return nil, err
    }
    if err := x.ClientStream.CloseSend(); err != nil {
        return nil, err
    }
    return x, nil
}

// printFeatures lists all the features within the given bounding Rectangle.
func printFeatures(client pb.RouteGuideClient, rect *pb.Rectangle) {
    log.Printf("Looking for features within %v", rect)
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    stream, err := client.ListFeatures(ctx, rect)
    if err != nil {
        log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    }
    for {
        feature, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
        }
        log.Println(feature)
    }
}

// ListFeatures lists all features contained within the given bounding Rectangle.
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
    for _, feature := range s.savedFeatures {
        if inRange(feature.Location, rect) {
            if err := stream.Send(feature); err != nil {
                return err
            }
        }
    }
    return nil
}

Client side streaming RPC: client side streaming RPC

func (c *routeGuideClient) RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error) {
    stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[1], "/routeguide.RouteGuide/RecordRoute", opts...)
    if err != nil {
        return nil, err
    }
    x := &routeGuideRecordRouteClient{stream}
    return x, nil
}

// runRecordRoute sends a sequence of points to server and expects to get a RouteSummary from server.
func runRecordRoute(client pb.RouteGuideClient) {
    // Create a random number of random points
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
    var points []*pb.Point
    for i := 0; i < pointCount; i++ {
        points = append(points, randomPoint(r))
    }
    log.Printf("Traversing %d points.", len(points))
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    stream, err := client.RecordRoute(ctx)
    if err != nil {
        log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
    }
    for _, point := range points {
        if err := stream.Send(point); err != nil {
            log.Fatalf("%v.Send(%v) = %v", stream, point, err)
        }
    }
    reply, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
    }
    log.Printf("Route summary: %v", reply)
}


// RecordRoute records a route composited of a sequence of points.
//
// It gets a stream of points, and responds with statistics about the "trip":
// number of points,  number of known features visited, total distance traveled, and
// total time spent.
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
    var pointCount, featureCount, distance int32
    var lastPoint *pb.Point
    startTime := time.Now()
    for {
        point, err := stream.Recv()
        if err == io.EOF {
            endTime := time.Now()
            return stream.SendAndClose(&pb.RouteSummary{
                PointCount:   pointCount,
                FeatureCount: featureCount,
                Distance:     distance,
                ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
            })
        }
        if err != nil {
            return err
        }
        pointCount++
        for _, feature := range s.savedFeatures {
            if proto.Equal(feature.Location, point) {
                featureCount++
            }
        }
        if lastPoint != nil {
            distance += calcDistance(lastPoint, point)
        }
        lastPoint = point
    }
}

Bidirectional streaming RPC: bidirectional streaming RPC

func (c *routeGuideClient) RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) {
    stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[2], "/routeguide.RouteGuide/RouteChat", opts...)
    if err != nil {
        return nil, err
    }
    x := &routeGuideRouteChatClient{stream}
    return x, nil
}

// runRouteChat receives a sequence of route notes, while sending notes for various locations.
func runRouteChat(client pb.RouteGuideClient) {
    notes := []*pb.RouteNote{
        {Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
    }
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    stream, err := client.RouteChat(ctx)
    if err != nil {
        log.Fatalf("%v.RouteChat(_) = _, %v", client, err)
    }
    waitc := make(chan struct{})
    go func() {
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                // read done.
                close(waitc)
                return
            }
            if err != nil {
                log.Fatalf("Failed to receive a note : %v", err)
            }
            log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
        }
    }()
    for _, note := range notes {
        if err := stream.Send(note); err != nil {
            log.Fatalf("Failed to send a note: %v", err)
        }
    }
    stream.CloseSend()
    <-waitc
}

// RouteChat receives a stream of message/location pairs, and responds with a stream of all
// previous messages at each of those locations.
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        key := serialize(in.Location)

        s.mu.Lock()
        s.routeNotes[key] = append(s.routeNotes[key], in)
        // Note: this copy prevents blocking other clients while serving this one.
        // We don't need to do a deep copy, because elements in the slice are
        // insert-only and never modified.
        rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
        copy(rn, s.routeNotes[key])
        s.mu.Unlock()

        for _, note := range rn {
            if err := stream.Send(note); err != nil {
                return err
            }
        }
    }
}

Two main methods of Client connection

1. Invoke function

newClientStream: obtain the transport layer Trasport and package it to ClientStream to return. In this case, it involves the actions of load balancing, timeout control, Encoding and Stream, which is basically the same as the behavior of the server.
cs.SendMsg: send RPC request out, but it does not assume the function of waiting for response.
cs.RecvMsg: blocks RPC method response results waiting to be received.

// Invoke sends the RPC request on the wire and returns after response is
// received.  This is typically called by generated code.
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
    // allow interceptor to see all applicable call options, which means those
    // configured as defaults from dial option as well as per-call options
    opts = combine(cc.dopts.callOptions, opts)

    if cc.dopts.unaryInt != nil {
        return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
    }
    return invoke(ctx, method, args, reply, cc, opts...)
}

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

2. NewStream function

// NewStream creates a new Stream for the client side. This is typically
// called by generated code. ctx is used for the lifetime of the stream.
//
// To ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed:
//
//      1. Call Close on the ClientConn.
//      2. Cancel the context provided.
//      3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
//         client-streaming RPC, for instance, might use the helper function
//         CloseAndRecv (note that CloseSend does not Recv, therefore is not
//         guaranteed to release all resources).
//      4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
//
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
    // allow interceptor to see all applicable call options, which means those
    // configured as defaults from dial option as well as per-call options
    opts = combine(cc.dopts.callOptions, opts)

    if cc.dopts.streamInt != nil {
        return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
    }
    return newClientStream(ctx, desc, cc, method, opts...)
}

Reference material

1.From practice to principle, let you know GRPC.
2.Geek time: interesting talk about network protocol 32-36.

Tags: Go encoding network Java

Posted on Tue, 10 Mar 2020 20:44:49 -0700 by reckdan