Downloading files with a Server Streaming gRPC API.
Today I’m continuing my gRPC API series. In my last post, we went over unary gRPC API’s as an introduction to gRPC. Remember there are four different types of gRPC API’s.
- Unary
- Server Streaming
- Client Streaming
- Bi-directional
In today’s post, I want to talk about server streaming.
What is Server Streaming
Server streaming is when a gRPC client opens up a new connection with a gRPC server and gets a series of streamed responses from the server for a single request.
When would I need something like this?
Maybe you’re building a chat application or a live feed and don’t want to make the client have to make a new request everytime they want to refresh the feed.
Another use-case is serializing large files across the wire. Instead of attempting to send the entire file in a single response, you could stream the file to the client. This is the example i’m going to be using today.
Creating a project
I wan’t to demonstrate a server streaming example through implementing file downloads.
We’ll need a client, a server, and of course a proto dir to define our message schemas between the two.
.
├── client
├── proto
└── server
Writing and compiling the protobuf
Lets dissect the protobuf we’re using.
- We’re using the version 3 syntax which is the latest version.
- We’re naming our package
file
. - We’re passing an option that translates to “when I compile this protobuf into a go package I want the name of the package to be
filepb
”. You could of course do this for any language since gRPC is language agnostic. - Our first message schema defines an object/data-structure referred to as a
fileRequest
and it contains one field of type string calledfile_name
. The 1 is a tag number that describes which position the field will be serialized in. - The next message schema defines a fileResponse object that has one field of type bytes.
syntax="proto3";
package file;
option go_package="filepb";
message fileRequest{
string file_name = 1;
}
message fileResponse{
bytes shard = 1;
}
service fileService{
rpc Download(fileRequest) returns (stream fileResponse){};
}
This service part should look a little different than what we did in my protobuffers post. Do you notice what it is? It’s the stream key word. The stream keyword defines the type of the endpoint.
It should be interpreted as follows :
- We’re making a service called
fileService
. - It has one endpoint called Download.
- Download takes a fileRequest and streams multiple file responses.
When the stream keyword is put in front of a message, it means multiple. Multiple fileResponses will be returned for a single fileRequest. This is the main concept of server streaming. Servers send responses right? Therefore if an endpoint sends multiple responses, it’s a server streaming endpoint. Am I beating a dead horse yet?
Fine. Let’s compile this bad boy. Remember the command to compile to go?
protoc file/proto/file.proto --go_out=plugins=grpc:.
In english this means path to proto file, flag defining language to compile out to, which plugin, put the new file in the same dir as the proto.
.
├── client
├── proto
│ ├── file.pb.go
│ └── file.proto
└── server
├── files
Compiling our proto creates a new go file with a ton of pre-generated code for creating new server and client grpc instances as well as get methods for the data structures we defined in our proto file. You may also notice I added a files
dir in server
. This is where the server will look for any files the client requests.
Implement the server and client
Let’s start with the server and break down the entry point.
server-streaming/server/server.go
package main
import (
"fmt"
"io"
"log"
"net"
"os"
"path/filepath"
filepb "github.com/fuskovic/server-streaming/proto"
"google.golang.org/grpc"
)
var filesDir = "files"
type server struct{}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen on 50051 : %v\n", err)
}
s := grpc.NewServer()
filepb.RegisterFileServiceServer(s, &server{})
fmt.Println("starting gRPC server on 50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to start server : %v\n", err)
}
}
- We start listening on port 50051.
- We instantiate a new gRPC server.
- We register the file service with our server.
- We run the server on port 50051.
So far so good. Now lets look at that Download
method.
func (s *server) Download(req *filepb.FileRequest, stream filepb.FileService_DownloadServer) error {
fileName := req.GetFileName()
path := filepath.Join(filesDir, fileName)
fileInfo, err := os.Stat(path)
if err != nil {
return err
}
fileSize := fileInfo.Size()
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
var totalBytesStreamed int64
for totalBytesStreamed < fileSize {
shard := make([]byte, 1024)
bytesRead, err := f.Read(shard)
if err == io.EOF {
log.Print("download complete")
break
}
if err != nil {
return err
}
if err := stream.Send(&filepb.FileResponse{
Shard: shard,
}); err != nil {
return err
}
totalBytesStreamed += int64(bytesRead)
}
return nil
}
- The endpoint is implemented as a server method.
- It takes a
fileRequest
andstream
as args and returns anerror
. - We get the requested filename and join it with our global var
filesDir
to build the path to the requested file. - We use
os.Stat
to check the status of the path. This tells us whether the file exists and if it does we call itsSize
method to return the file size. - We open the file at the path and instantiate a new variable called
totalBytesStreamed
which will help us keep track of how much of the file we have already streamed. - We start a loop that will continue as long as the amount of bytes streamed is less than the the
fileSize
. - On every iteration of the loop, we create a new
shard
. Our shard is a buffer whose capacity we have explicitly set to 1024 bytes. - Next we fill the shard by reading bytes from the requested file into it. It’s important to notice I’m using the files
Read
method which comes from theos
library. If you read my post on file libraries in go you may remember that this method leaves the cursor position where it’s at after a read operation. This allows us to pick up read operations where they left off when we loop again. - When the byte cursor can no longer advance, the method will return an
io.EOF
err which will let us know that the entire file has been read. - Every iteration of the loop is finished by sending a
fileResponse
containing ourshard
into thestream
.
Cool, we’ve implemented a server with a Download
endpoint. Time to build a client.
server-streaming/client/client.go
package main
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strings"
//using humanize to format B,MB,GB, etc...
"github.com/dustin/go-humanize"
filepb "github.com/fuskovic/server-streaming/proto"
"google.golang.org/grpc"
)
func main() {
args := os.Args[1:]
if len(args) != 1 {
log.Fatalf("Please provide a filename argument")
}
requestedFile := args[0]
cc, err := grpc.Dial(":50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("failed to establish connection with gRPC server : %v\n", err)
}
defer cc.Close()
c := filepb.NewFileServiceClient(cc)
if err := download(requestedFile, c); err != nil {
log.Fatalf("failed to download %s : %v\n", requestedFile, err)
}
fmt.Printf("\nsuccessfully downloaded %s\n", requestedFile)
}
We’ll break this down the same way by starting with the entry point and then doing a deep-dive into that download
func.
- We’ve implemented our client such that it takes a
filename
cli arg for the requested file so the first few lines just evaluate the presense of that argument. - We bind the client to the same port the server is being served on ( you may be noticing that
grpc.WithInsecure()
param. Just ignore that for now please because I’m going to do an entirely separate post on securing gRPC connections with SSL/TLS ). - Moving on, we create a new client instance (very easily thanks to the auto-generated code in our compiled proto file).
- Then we invoke the
download
func, passing in the client and requested file.
Let’s see what that download
func is doing.
func download(fileName string, client filepb.FileServiceClient) error {
req := &filepb.FileRequest{
FileName: fileName,
}
stream, err := client.Download(context.Background(), req)
if err != nil {
return err
}
var downloaded int64
var buffer bytes.Buffer
for {
res, err := stream.Recv()
if err == io.EOF {
if err := ioutil.WriteFile(fileName, buffer.Bytes(), 0777); err != nil {
return err
}
break
}
if err != nil {
buffer.Reset()
return err
}
shard := res.GetShard()
shardSize := len(shard)
downloaded += int64(shardSize)
buffer.Write(shard)
fmt.Printf("\r%s", strings.Repeat(" ", 25))
fmt.Printf("\r%s downloaded", humanize.Bytes(uint64(downloaded)))
}
return nil
}
- download takes the filename of the requested file, client instance, and returns an error.
- We create a new
fileRequest
and pass it toclient.Download
which returns a stream. - We initialize a new var
downloaded
to keep track of how many bytes we’ve received from the stream. - We initialize a new var buffer to store incoming byte data from the stream. It’s worth mentioning that
bytes.Buffer
will resize its byte capacity automatically according to the amount of bytes read into it. - We start a loop that receives any incoming responses coming from the stream.
- When we get a response, we evaluate the size of bytes in the shard and add it to the total number of bytes downloaded.
- We write the shard into the buffer and output our download progress before looping again.
- When the stream ends on the server-side, our buffer writes the file to disk and breaks the loop. Notice we’re using
ioutil.Write
which I also covered in my file libraries post. It handles creating a file of filename if it doesn’t exist, opening it, writing to it, and closing it all in one function.
The funny formatting you see when we output progress can be interpreted as follows : Move to the beginning of the current line and replace it with 25 blank spaces then move to the beginning of the line again and output the amount of bytes downloaded which is formatted by the humanize
package I imported. The purpose of this is to simulate a changing number for the amount of bytes downloaded in the cli.
I filmed a small 30 second video of my desktop to use as a test file and placed it in server-streaming/server/files/
. This is the dir our server will check for requested files.
.
├── client
│ └── client.go
├── proto
│ ├── file.pb.go
│ └── file.proto
└── server
├── files
│ └── testvideo.mp4
└── server.go
Ok we’re ready to test this out. Im going to open two shells. The first in server/
and another in client/
.
In the first shell I’ll start the server :
go run server.go
starting gRPC server on 50051
In the second we’ll initiate the file request from the client :
go run client.go testvideo.mp4
23 MB downloaded
successfully downloaded testvideo.mp4
Running another
tree -L 3
shows us the video file has been downloaded by the client.
.
├── client
│ ├── client.go
│ └── testvideo.mp4
├── proto
│ ├── file.pb.go
│ └── file.proto
└── server
├── files
│ └── testvideo.mp4
└── server.go
That’s it! We’ve successfully demonstrated a server-streaming gRPC API with file downloads. I’ve pushed up the repo so anyone can play with the code and try it out themselves.
Much love,
-Faris