gRPC 구현 - Client Streaming RPC #2
이번 글에서는 gRPC 의 Client Streaming RPC 예제를 구현 해보도록 한다. Client Streaming RPC 는 다량의 데이터를 스트리밍 방식으로 전송한다는 점에서 지난 글에서 다뤘던 Unary RPC 와 다르다. (아래 그림에서 왼쪽 아래)
gRPC Learning (Part 1) - Yang(Bruce) Li - Medium
Client Streaming RPC의 구현
시작하기
Client Streaming RPC는 큰 데이터를 서버로 전송하고 단항으로 응답을 받는 서비스에 적합한 모델이다. 예를 들어 큰 이미지들을 전송해야 한다던지, 다량의 데이터를 서버로 전송해야 하는 상황에 적합하다. 다음은 이번 예제의 디렉토리 구조이다.
// 입장 클라이언트 path
$ mkdir -p cmd/stream-client/client
// 서버 path
$ mkdir -p cmd/stream-client/server
// gRPC 라이브러리 path
$ mkdir -p core/stream-client/
core/stream-client/stream-client.proto
어떤 예제를 작성할까 고민하다가 지난 글(gRPC 구현 - Unary RPC #1)에 구현했던 protobuf 명세를 수정해서 사용하기로 했다.
이전 명세와 가장 크게 다른 부분은 rpc 선언부에 인자값으로 Guest 메시지를 stream 을 받겠다고 명시한 부분과 Guest 메시지에 bytes 데이터 포멧으로 이미지를 받도록 명시한 부분이다.
syntax = "proto3";
package stream_client;
// Room 서비스를 생성
service Room {
// Guest메시지를 스트림으로 전달하겠다고 정의함
rpc Entry (stream Guest) returns (Message);
}
message Message {
string message = 1;
}
message Guest {
string name = 1;
int32 age = 2;
// 아바타이미지 데이터 필드 추가
bytes avatar = 3;
}
make go code
새로운 코드를 만들어 보자
protoc --proto_path=core/stream-client --go_out=plugins=grpc:cor/stream-client/ stream-client.proto
cmd/stream-client/server/main.go
이전에 작성해봤던 서버코드와 크게 다르지 않다. protoc에서 생성된 인터페이스를 찾아서 ~바인딩할 구조체~ (room) 를 구현하면 된다.
서버코드에서 눈여겨 봐야할 부분은 stream 을 수신 받는 부분과 종료하는 부분이며 주석처리한 부분을 살펴보자.
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"io"
pb "learn-grpc/core/stream-client"
"net"
"os"
)
type room struct{}
// protoc 로 생성한 stream-client.pb.go 파일내 정의된 interface 를 참조하여 room struct 를 구현하면 된다.
func (t *room) Entry(stream pb.Room_EntryServer) error {
// [] 스트리밍 종료시까지 아바타 이미지를 저장한다.
for {
// [] 임시 파일 생성
file, e := os.Create("temp")
if e != nil {
logrus.Error(e)
return e
}
// [] 스트림 데이터 수신
res, e := stream.Recv()
if e == io.EOF {
// 스트리밍 종료 처리
logrus.Info("receive done")
logrus.Info(" ")
file.Close()
os.Remove("temp")
break
}
if e != nil {
logrus.Error(e)
file.Close()
os.Remove("temp")
return nil
}
// 아바타 이미지 저장
file.Write(res.Avatar)
file.Close()
// 파일명 변경
os.Rename("temp", fmt.Sprintf("%s.png", res.Name))
logrus.Info("receive ", res.Name)
}
// 스트리밍 종료시에 단항 메시지 전송
return stream.SendAndClose(&pb.Message{
Message: "success",
})
}
func main() {
l, e := net.Listen("tcp", ":8080")
if e != nil {
logrus.Error(e)
return
}
srv := grpc.NewServer()
wsrv := &room{}
pb.RegisterRoomServer(srv, wsrv)
logrus.Info(fmt.Sprintf("gRPC Server (%s)", l.Addr().String()))
if e := srv.Serve(l); e != nil {
logrus.Error(e)
}
}
proton 에서 작성한 stream-client.pb.go 에서 살펴봐야 하는 부분
room 구조체를 작성할때 stream-client.pb.go 의 RoomServer 인터페이스를 따라 작성하면 된다.
cmd/stream-client/client/main.go
클라이언트에서는 아바타 이미지를 가지고 있는 가상의 사용자를 여럿 만들어서 0.5초 간격으로 서버로 전송시키는 코드를 작성했다. 눈여겨 봐야 할 부분은 스트림 데이터를 반복 전송하고 종료 후 메시지를 받아오는 부분이다.
package main
import (
"context"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"io"
pb "learn-grpc/core/stream-client"
"os"
"time"
)
func load(path string) ([]byte, error) {
file, e := os.Open(path)
if e != nil {
logrus.Error(e)
return nil, e
}
defer file.Close()
done := make([]byte, 0)
buf := make([]byte, 2048)
for {
_, e := file.Read(buf)
if e == io.EOF {
break
}
if e != nil {
return nil, e
}
done = append(done, buf...)
}
return done, nil
}
func main() {
conn, e := grpc.Dial("localhost:8080", grpc.WithInsecure())
if e != nil {
logrus.Error(e)
return
}
defer conn.Close()
// gRPC 클라이언트 접속
c := pb.NewRoomClient(conn)
stream, e := c.Entry(context.Background())
if e != nil {
logrus.Error(e)
return
}
for _, name := range []string{
"karl-1", "sienna-1",
"karl-2", "sienna-2",
"karl-3", "sienna-3",
} {
// [] 가상의 사용자 블럭
member := pb.Guest{Name: name, Age: 10}
// load 는 특정위치의 파일을 읽어 []byte 로 리턴한다.
sp, e := load("./cmd/stream-client/client/avatar.png")
if e != nil {
logrus.Error(e)
return
}
member.Avatar = sp
// [] 스트리밍 발송
stream.Send(&member)
logrus.Info("send")
time.Sleep(500 * time.Millisecond)
}
//
//
res, e := stream.CloseAndRecv()
if e != nil {
logrus.Error(e)
return
}
logrus.Info("Final response", res)
}
구현 결과
실행 결과는 구현한 바 그대로 가상의 사용자 이름으로 이미지 파일이 생성 되었고, 서버 단에서도 이를 올바르게 처리 했다.