[gRPC] Lecture 11 – P1: Implement server-streaming gRPC API in Golang


Hi everyone, Today we will learn how to implement a server-streaming RPC in Go. First we will define a new RPC in the proto
file to search for laptops with some specific requirements. Then we will implement the server, the client,
and write unit test for that RPC. Alright let’s start! I will open the pcbook golang project that
we’ve been working on. Our RPC will allow us to search for laptops that satisfy some configuration requirements. So I will create a filter_message.proto file. This message will define what kind of laptop
we’re looking for. Such as the maximum price that we’re willing
to pay for the laptop. The minimum number of cores that the laptop
CPU should have. The minimum frequency of the CPU. And the minimum size of the RAM. OK now we will define the new server-streaming
RPC in the laptop_service.proto file. First we define the SearchLaptopRequest that
contains only 1 Filter field. Then a SearchLaptopResponse that contains
only 1 Laptop field. The server-streaming RPC is defined in a similar
way to the unary RPC. Start with the rpc keyword, Then the RPC name is SearchLaptop, The input is SearchLaptopRequest, And the output is a stream of SearchLaptopResponse. That’s it. Pretty straight-forward. OK let’s generate the code. In the laptop_service.pb.go file, Some new codes have been added. We have the SearchLaptopRequest struct. The SearchLaptopResponse struct. Then the LaptopServiceClient interface with
a new SearchLaptop function. Similarly we also have a new SearchLaptop
function inside the LaptopServiceServer interface. We will implement the server side first. Let’s add a Search() function to the LaptopStore
interface. It takes a filter as input, and also a callback function to report whenever
a laptop is found. It will return an error. Now let’s implement this function for the
InMemoryLaptopStore. Since we’re reading data, we have to acquire
a read lock. Remember to unlock it afterward. We iterate through all laptops in the store And check which one is qualified to the filter. The isQualified() function takes a filter
and a laptop as input And returns true if the laptop satisfies the
filter. If the laptop price is greater than the maximum price in the filter, returns false. If the number cores of the laptop CPU is less
than the minimum cores in the filter, return false. If the minimum frequency of the laptop CPU
is less than that of the filter, return false. Now we have to compare the RAM. Since there are different types of memory
units, To compare them, we have to write a function to convert the memory to the smallest unit:
BIT. If the size of the laptop RAM is smaller than
that of the filter, return false. Else return true. Now let’s implement the toBit() function. First we get the memory value. Then we do a switch-case on the memory unit. If it is BIT, we simply return the value. If it is BYTE, we have to multiply the value by 8 because
1 BYTE=8 BITs. And because 8=2^3, we can use a bit-operator
shift-left 3 here to avoid multiplication. If it is KILOBYTE, we have to multiply the value by 1024 and
8 because 1 KILOBYTE=1024 BYTEs And because 1024 * 8=2^13, we can use a
simple shift-left 13 here. Similarly, if it is MEGABYTE, we return value
shift-left 23. For GIGABYTE, value shift-left 33 And finally for TERABYTE, value shift-left
43. For the default case, just return 0. Now let’s go back to our SearchLaptop()
function. When the laptop is qualified, we have to deep-copy it before calling the
callback function. Since deep-copy is used in many places, I will write a separate function for it. Just copy and paste the code block to this
deepCopy function. Then in this Find() function, we simply return
deepCopy(laptop) And the Save() function can also be simplified
like this. In the Search() function, we deep copy the
qualified laptop, And call found() to send it to the caller. If there’s an error, returns it. Otherwise, return nil at the end of the function. OK, the store is done. Now let’s implement the server. We will have to implement the SearchLaptop
function Of the LaptopServiceServer interface. So I will copy this function And paste it in the laptop_server.go file It has 2 arguments: the input request and the output stream response The first thing we do is to get the filter
from the request. Then we write a log saying a search-laptop
request is received with this filter. And we call server.Store.Search, Pass in the filter, and a callback function. If an error occurs, we return it with the
Internal status code Else we return nil. Now in the callback function, When we found a laptop, We create a new response object with that
laptop. And send it to the client by calling stream.Send() If an error occurs, just return it. Else, we write a simple log saying we have sent the laptop with this ID. Then return nil. And we’re done with the server. Now let’s implement the client. First I will make a separate function to create
a random laptop. Let’s copy this code block, And paste it in the createLaptop() function. Now in the main function, we will use a for loop to create 10 random
laptops. Then we will create a new search filter. I want to search for laptops with maximum
price of 3000, At least 4 CPU cores, Minimum frequency of 2.5, And at least 8 gigabytes of RAM. Now we call searchLaptop with the client and
the filter. Let’s write this function. First we write a log here to show the filter
values. Then we create a context with timeout of 5
seconds. We make a SearchLaptopRequest object with
the filter. Then we call laptopClient.SearchLaptop() to
get the stream If there’s an error, write a fatal log. Else, we use a for loop to receive multiple
responses from the stream. If it returns an end-of-file (EOF) error, This means it’s the end of the stream. So we just return. Otherwise, if error is not nil, We write a fatal log. If everything goes well, we can get the laptop
from the stream. I will print out only a few properties of
the laptop so that it’s easier to read. The laptop ID, The brand, The name, The number of CPU cores, The min frequency of the CPU, The RAM. And finally the price. OK now let’s run the server. And run the client. There’s a deadline exceeded error when creating
the laptops. This is because in the previous lecture, We’re doing a sleep for 6 seconds on the
server side. So let’s comment this out. And restart the server. Then re-run the client. This time we’ve created 10 laptops, And found 1 laptop that matches the filter. Let’s run it one more time to create 10
more laptops. This time we found 3 matched laptops. Let’s look at the server logs. Here it receives a search-laptop request, And sent 3 laptops to the client. Perfect! Now let’s simulate the timeout case. In the Search function of the laptop store, Let’s say it runs very slowly, each iteration
takes 1 second. We write a log here so that we can track the
progress. OK, let’s restart the server. Then run the client. After a few seconds, it gets a deadline exceeded
error. Let’s run it one more time so that the server will have more records
to scan through. OK the deadline error is thrown. However, on the server side, You can see that it’s still checking more
records. This is useless because the client has already
cancelled the request. So let’s fix it. In this for loop, Before checking if a laptop is qualified or
not, We will have to check the context status. To do that, we have to add the request context As a parameter of the Search function. Alright, Now we check if the context error is Cancelled
or DeadlineExceeded. If it is, we write a log and return an error saying the context is
cancelled. In the laptop server, We have to get the context from the stream And pass it in the Search function. And that’s it. Now let’s restart the server. Then run the client. One more time. OK. This time, on the server side, We can see a log “context is cancelled” And it stops processing other records. So it works as expected. Now I will show you how to write unit tests
for the server-streaming RPC. There are 2 ways to do this. The first way is to mock this stream interface, Provide an implementation of the Send function
to catch the responses, But we also need to add some empty implementation of the functions in the grpc.ServerStreaming
interface There are about 6 of them, so it’s too much. Thus I will use the 2nd way, Which is to use the client to call the RPC
on the test server. I will copy this setup block, And paste it here to make a new unit test. It’s gonna be TestClientSearchLaptop. First I will create a search filter, Let’s say the max price is 2000, The min cpu cores is 4, The min cpu frequency is 2.2, And the min ram is 8 gigabytes, Next I will create a new in-memory laptop
store to insert some laptops for searching. This expectedIDs map will contain all laptop
IDs that we expect to be found by the server. OK now we will use a for loop to create 6
laptops. The first case will be an unmatched laptop
with a too high price. The second case is also unmatched because
it has only 2 cores. The third case doesn’t match because the
min frequency is too low. The fourth case doesn’t match since it has
only 4 gigs ram. The fifth case is gonna be a matched laptop. The price is 1999. It has 4 cores, With minimum frequency of 2.5 Max frequency of 4.5 And 16 gigabytes of RAM. We add the ID of this laptop to the expectedIDs
map. The last case is also matched. So I will just duplicate the previous one, And change the configurations a bit. Alright, Now we call Store.Save to save the laptop
to the store. Require there’s no error. Next we have to add this store to the test
laptop server. I will add one more store parameter to this
function. Then update the create-laptop test to pass
in a new in-memory laptop store. OK, back to our search-laptop test. Here we’re not gonna use the laptopServer
object, so I will remove it. Now we create a new SearchLaptopRequest with
the filter. Then we call laptopCient.SearchLaptop with
the created request. We require no errors to be returned. Next, I will use this variable to keep track
of the number of laptops found. Then use a for loop to receive multiple responses. If we got an end-of-file error, then break. Else we check that there’s no error. And the laptop ID should be in the expectedIDs
map. Then we increase the number of laptops found. Finally we require that number to equal to
the size of the expectedIDs. OK now let’s run this unit test. It passed. But it took 6 seconds to run. That’s because we forget to comment out
the time.Sleep in the search function. So let’s comment it out. And re-run the test. It’s much faster now. Let’s run the whole package test. All passed. And the coverage is 75.8% That’s all for today’s lecture. We have learned how to implement and test
a server-streaming RPC in Go. In the next video, we will learn how to do
that in Java. Thanks for watching and I will see you later.

Leave a Reply

Your email address will not be published. Required fields are marked *