博客 / 詳情

返回

go-zero docker-compose 搭建課件服務(九):http統一返回和集成日誌服務

0、索引

go-zero docker-compose 搭建課件服務(九):http統一返回和集成日誌服務

0.1源碼地址

https://github.com/liuyuede123/go-zero-courseware

1、http統一返回

一般返回中會有codemessagedata。當請求成功的時候code返回0或者200,message返回success,data為要獲取的數據;當請求失敗的時候code返回自定義的錯誤碼,message返回展示給前端的錯誤信息,data為空。

我們將封裝一個錯誤返回的函數,應用到api handler的返回

在user服務中創建了common文件夾,裏面存一些公用的方法,創建response/response.go

package response

import (
    "go-zero-courseware/user/common/xerr"
    "net/http"

    "github.com/pkg/errors"
    "github.com/zeromicro/go-zero/core/logx"
    "github.com/zeromicro/go-zero/rest/httpx"
    "google.golang.org/grpc/status"
)

type Response struct {
    Code    uint32      `json:"code"`
    Message string      `json:"message"`
    Data    interface{} `json:"data"`
}

//http返回
func HttpResult(r *http.Request, w http.ResponseWriter, resp interface{}, err error) {

    if err == nil {
        //成功返回
        r := &Response{
            Code:    0,
            Message: "success",
            Data:    resp,
        }
        httpx.WriteJson(w, http.StatusOK, r)
    } else {
        //錯誤返回
        errcode := uint32(500)
        errmsg := "服務器錯誤"

        causeErr := errors.Cause(err)                // err類型
        if e, ok := causeErr.(*xerr.CodeError); ok { //自定義錯誤類型
            //自定義CodeError
            errcode = e.GetErrCode()
            errmsg = e.GetErrMsg()
        } else {
            if gstatus, ok := status.FromError(causeErr); ok { // grpc err錯誤
                grpcCode := uint32(gstatus.Code())
                errcode = grpcCode
                errmsg = gstatus.Message()
            }
        }

        logx.WithContext(r.Context()).Errorf("【API-ERR】 : %+v ", err)

        httpx.WriteJson(w, http.StatusBadRequest, &Response{
            Code:    errcode,
            Message: errmsg,
            Data:    nil,
        })
    }
}

創建xerr/errors.go文件,定義CodeError結構體

package xerr

import (
    "fmt"
)

/**
常用通用固定錯誤
*/
type CodeError struct {
    errCode uint32
    errMsg  string
}

//返回給前端的錯誤碼
func (e *CodeError) GetErrCode() uint32 {
    return e.errCode
}

//返回給前端顯示端錯誤信息
func (e *CodeError) GetErrMsg() string {
    return e.errMsg
}

func (e *CodeError) Error() string {
    return fmt.Sprintf("ErrCode:%d,ErrMsg:%s", e.errCode, e.errMsg)
}

func NewErrCodeMsg(errCode uint32, errMsg string) *CodeError {
    return &CodeError{errCode: errCode, errMsg: errMsg}
}

由於api一般調用的rpc的請求,獲取到的錯誤無法展示給前端使用,我們會使用自定義的錯誤類型。當讓rpc中的錯誤也可能是前端直接可以展示的錯誤,或者是數據庫的某個異常拋出的錯誤,如果想區分這些錯誤,可以自己定義業務端code和message做下區分就行。這裏我們統一api服務中處理。

當api或者rpc中有一些未知錯誤拋出的時候我們需要寫入到日誌中,包括具體的錯誤信息和堆棧信息。這些後續放到日誌服務ELK中可以方便查看。

修改userinfohandler.go、userloginhandler.go、userregisterhandler.go中的返回

...

response.HttpResult(r, w, resp, err)

修改userinfologic.go

...

func (l *UserInfoLogic) UserInfo(req *types.UserInfoRequest) (resp *types.UserInfoResponse, err error) {
    info, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &userclient.UserInfoRequest{
        Id: req.Id,
    })
    if err != nil {
    // 自定義的錯誤返回
        return nil, xerr.NewErrCodeMsg(500, "用户查詢失敗")
    }

    return &types.UserInfoResponse{
        Id:        info.Id,
        Username:  info.Username,
        LoginName: info.LoginName,
        Sex:       info.Sex,
    }, nil
}

修改userloginlogic.go

...

func (l *UserLoginLogic) UserLogin(req *types.LoginRequest) (resp *types.LoginResponse, err error) {
    login, err := l.svcCtx.UserRpc.Login(l.ctx, &userclient.LoginRequest{
        LoginName: req.LoginName,
        Password:  req.Password,
    })
    if err != nil {
        return nil, xerr.NewErrCodeMsg(500, "用户登錄失敗")
    }

    now := time.Now().Unix()
    login.Token, err = l.getJwtToken(l.svcCtx.Config.Auth.AccessSecret, now, l.svcCtx.Config.Auth.AccessExpire, int64(login.Id))
    if err != nil {
    // 返回錯誤信息,並打印堆棧信息到日誌
        return nil, errors.Wrapf(xerr.NewErrCodeMsg(5000, "token生成失敗"), "loginName: %s,err:%v", req, err)
    }
    return &types.LoginResponse{
        Id:    login.Id,
        Token: login.Token,
    }, nil
}

...

修改userregisterlogic.go

...

func (l *UserRegisterLogic) UserRegister(req *types.RegisterRequest) (resp *types.RegisterResponse, err error) {
    _, err = l.svcCtx.UserRpc.Register(l.ctx, &userclient.RegisterRequest{
        LoginName: req.LoginName,
        Username:  req.Username,
        Password:  req.Password,
        Sex:       req.Sex,
    })
    if err != nil {
    // 自定義的錯誤返回
        return nil, xerr.NewErrCodeMsg(5000, "註冊用户失敗")
    }

    return &types.RegisterResponse{}, nil
}

關於errors.Wrapf

第一個參數是錯誤信息,第二個是格式化之後的錯誤信息字符串,args是fromat中的動態參數。最終還是返回我們傳入的error,但是會把堆棧信息也打印出來。這個為後面的日誌服務做鋪墊

func Wrapf(err error, format string, args ...interface{}) error {
    if err == nil {
        return nil
    }
    err = &withMessage{
        cause: err,
        msg:   fmt.Sprintf(format, args...),
    }
    return &withStack{
        err,
        callers(),
    }
}

關於鑑權

對於鑑權,如果鑑權失敗,之前是直接返回401狀態碼,但是我們想同樣的返回錯誤信息和message

此時就需要自定義一個鑑權失敗的回調函數

我們在response.go中增加一個鑑權失敗的回調函數

...

func JwtUnauthorizedResult(w http.ResponseWriter, r *http.Request, err error) {
    httpx.WriteJson(w, http.StatusUnauthorized, &Response{401, "鑑權失敗", nil})
}

然後在api入口程序user.go中修改代碼如下

...

func main() {
    flag.Parse()

    var c config.Config
    conf.MustLoad(*configFile, &c)

  // 此處加入鑑權失敗的回調
    server := rest.MustNewServer(c.RestConf, rest.WithUnauthorizedCallback(response.JwtUnauthorizedResult))
    defer server.Stop()

    ctx := svc.NewServiceContext(c)
    handler.RegisterHandlers(server, ctx)

    fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
    server.Start()
}

然後我們再看下user的rpc服務

這裏我們會引入一個攔截器。什麼是攔截器?

定義:UnaryServerInterceptor 提供了一個鈎子來攔截服務器上一元 RPC 的執行。 信息包含攔截器可以操作的這個 RPC 的所有信息。 處理程序是包裝器服務方法實現。 攔截器負責調用處理程序完成 RPC。

其實就是攔截handler做一些返回前和返回後的處理

我們需要在common中新增一個攔截器方法,新建文件rpcserver/rpcserver.go

package rpcserver

import (
    "context"
    "github.com/pkg/errors"
    "github.com/zeromicro/go-zero/core/logx"
    "go-zero-courseware/user/common/xerr"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func LoggerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {

    resp, err = handler(ctx, req)
    if err != nil {
        causeErr := errors.Cause(err)                // err類型
        if e, ok := causeErr.(*xerr.CodeError); ok { //自定義錯誤類型
            logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v", err)

            //轉成grpc err
            err = status.Error(codes.Code(e.GetErrCode()), e.GetErrMsg())
        } else {
            logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v", err)
        }

    }

    return resp, err
}

然後在入口文件user.go中添加一個攔截器

...

s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)

...

課件服務和上面類似,這裏就不一一添加修改了

2、集成日誌服務

我們需要搭建一個ELK體系的服務,流程圖如下:
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707138.png

將會用到以下服務:

服務名 端口號
elasticsearch 9200
kibana 5601
go-stash
filebeat
zookeeper 2181
kafka 9092

docker-compose如下:

user服務中我們引入了日誌地址,到我們的宿主機上。之所以這樣做,是因為在mac系統上docker的日誌文件路徑和linux上的不一致。找了半天也沒在mac上找到容器的日誌。所以用户服務中的日誌會寫到文件中然後同步到宿主機的data/log目錄下。

還有就是filebeat日誌中,我們會從宿主機上的日誌同步到filebeat指定目錄。然後filebeat會同步到kafka

version: '3.5'
# 網絡配置
networks:
  backend:
    driver: bridge

# 服務容器配置
services:
  etcd: # 自定義容器名稱
    build:
      context: etcd                    # 指定構建使用的 Dockerfile 文件
    environment:
      - TZ=Asia/Shanghai
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
    ports: # 設置端口映射
      - "2379:2379"
    networks:
      - backend
    restart: always

  etcd-manage:
    build:
      context: etcd-manage
    environment:
      - TZ=Asia/Shanghai
    ports:
      - "7000:8080"                    # 設置容器8080端口映射指定宿主機端口,用於宿主機訪問可視化web
    depends_on: # 依賴容器
      - etcd                                          # 在 etcd 服務容器啓動後啓動
    networks:
      - backend
    restart: always

  courseware-rpc: # 自定義容器名稱
    build:
      context: courseware                 # 指定構建使用的 Dockerfile 文件
      dockerfile: rpc/Dockerfile
    environment: # 設置環境變量
      - TZ=Asia/Shanghai
    privileged: true
    ports: # 設置端口映射
      - "9400:9400"  # 課件服務rpc端口
    stdin_open: true                     # 打開標準輸入,可以接受外部輸入
    tty: true
    networks:
      - backend
    restart: always                      # 指定容器退出後的重啓策略為始終重啓

  courseware-api: # 自定義容器名稱
    build:
      context: courseware                  # 指定構建使用的 Dockerfile 文件
      dockerfile: api/Dockerfile
    environment: # 設置環境變量
      - TZ=Asia/Shanghai
    privileged: true
    ports: # 設置端口映射
      - "8400:8400"  # 課件服務api端口
    stdin_open: true                     # 打開標準輸入,可以接受外部輸入
    tty: true
    networks:
      - backend
    restart: always                      # 指定容器退出後的重啓策略為始終重啓

  user-rpc: # 自定義容器名稱
    build:
      context: user                 # 指定構建使用的 Dockerfile 文件
      dockerfile: rpc/Dockerfile
    environment: # 設置環境變量
      - TZ=Asia/Shanghai
    privileged: true
    volumes:
      - ./data/log/user-rpc:/var/log/go-zero/user-rpc # 日誌的映射地址
    ports: # 設置端口映射
      - "9300:9300"  # 課件服務rpc端口
    stdin_open: true                     # 打開標準輸入,可以接受外部輸入
    tty: true
    networks:
      - backend
    restart: always                      # 指定容器退出後的重啓策略為始終重啓

  user-api: # 自定義容器名稱
    build:
      context: user                  # 指定構建使用的 Dockerfile 文件
      dockerfile: api/Dockerfile
    environment: # 設置環境變量
      - TZ=Asia/Shanghai
    privileged: true
    volumes:
      - ./data/log/user-api:/var/log/go-zero/user-api
    ports: # 設置端口映射
      - "8300:8300"  # 課件服務api端口
    stdin_open: true                     # 打開標準輸入,可以接受外部輸入
    tty: true
    networks:
      - backend
    restart: always                      # 指定容器退出後的重啓策略為始終重啓

  elasticsearch:
    build:
      context: ./elasticsearch
    environment:
      - TZ=Asia/Shanghai
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    privileged: true
    ports:
      - "9200:9200"
    networks:
      - backend
    restart: always

  prometheus:
    build:
      context: ./prometheus
    environment:
      - TZ=Asia/Shanghai
    privileged: true
    volumes:
      - ./prometheus/prometheus.yml:/opt/bitnami/prometheus/conf/prometheus.yml  # 將 prometheus 配置文件掛載到容器裏
      - ./prometheus/target.json:/opt/bitnami/prometheus/conf/targets.json  # 將 prometheus 配置文件掛載到容器裏
    ports:
      - "9090:9090"                     # 設置容器9090端口映射指定宿主機端口,用於宿主機訪問可視化web
    networks:
      - backend
    restart: always

  grafana:
    build:
      context: ./grafana
    environment:
      - TZ=Asia/Shanghai
    privileged: true
    ports:
      - "3000:3000"
    networks:
      - backend
    restart: always

  jaeger:
    build:
      context: ./jaeger
    environment:
      - TZ=Asia/Shanghai
      - SPAN_STORAGE_TYPE=elasticsearch
      - ES_SERVER_URLS=http://elasticsearch:9200
      - LOG_LEVEL=debug
    privileged: true
    ports:
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "4317:4317"
      - "4318:4318"
      - "14250:14250"
      - "14268:14268"
      - "14269:14269"
      - "9411:9411"
    networks:
      - backend
    restart: always

  kibana:
    build:
      context: ./kibana
    environment:
      - elasticsearch.hosts=http://elasticsearch:9200
      - TZ=Asia/Shanghai
    privileged: true
    ports:
      - "5601:5601"
    networks:
      - backend
    restart: always
    depends_on:
      - elasticsearch

  go-stash:
    build:
      context: ./go-stash
    environment:
      - TZ=Asia/Shanghai
    privileged: true
    volumes:
      - ./go-stash/go-stash.yml:/app/etc/config.yaml
    networks:
      - backend
    restart: always
    depends_on:
      - elasticsearch
      - kafka

  filebeat:
    build:
      context: ./filebeat
    environment:
      - TZ=Asia/Shanghai
    entrypoint: "filebeat -e -strict.perms=false"
    privileged: true
    volumes:
      - ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml
      - ./data/log:/var/lib/docker/containers # 宿主機上的日誌同步到filebeat指定目錄
    networks:
      - backend
    restart: always
    depends_on:
      - kafka

  zookeeper:
    build:
      context: ./zookeeper
    environment:
      - TZ=Asia/Shanghai
    privileged: true
    networks:
      - backend
    ports:
      - "2181:2181"
    restart: always

  kafka:
    build:
      context: ./kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
      - TZ=Asia/Shanghai
      - ALLOW_PLAINTEXT_LISTENER=yes
    restart: always
    privileged: true
    networks:
      - backend
    depends_on:
      - zookeeper

(項目根目錄下自行創建對應的Dokcerfile)

filebeat需要引入配置文件filebeat.yml如下:

其中filebeat需要從宿主機同步數據,就是上面用户服務中生成的日誌文件,會同步到filebeat的對應文件中

拉取過來的文件會輸出到kafka指定的topic中,我們這裏定義的是courseware-log

filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/lib/docker/containers/*/*.log # 此為宿主機同步過來的日誌文件

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false

processors:
  - add_cloud_metadata: ~
  - add_docker_metadata: ~

output.kafka:
  enabled: true
  hosts: ["kafka:9092"]
  #要提前創建topic
  topic: "courseware-log"
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1

用户服務中也需要修改etc下的user.yaml配置,增加日誌的配置,輸出到data/log目錄下

Log:
  Mode: file
  Path: /var/log/go-zero/user-api
  Level: error
Log:
  Mode: file
  Path: /var/log/go-zero/user-rpc
  Level: error

我們啓動下相關服務,請求下user-api的接口
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707193.png

然後回到項目中查看data/log中是否生成相關日誌
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707211.png

日誌正常輸出,再到filebeat服務中,查看文件是否同步上去:

# 進入容器
docker exec -it 231bf79f3d5e21cea153bd94bf29693e67360113256e0e3c67a693e727d0b660 /bin/sh
# 查看目錄
cd /var/lib/docker/containers
ls
user-api  user-rpc

然後我們再到kafka的容器中

# 進入到容器
docker exec -it cb764aeb86e8296a805e47c85f65ac5334c3ed15630fe36e7a39a81ca1bad67f /bin/sh

# 到bin目錄下
cd /opt/bitnami/kafka/bin

# 可以看到這些調試腳本
$ ls
connect-distributed.sh          kafka-cluster.sh         kafka-consumer-perf-test.sh  kafka-get-offsets.sh    kafka-producer-perf-test.sh    kafka-server-stop.sh           kafka-verifiable-consumer.sh     zookeeper-server-start.sh
connect-mirror-maker.sh       kafka-configs.sh         kafka-delegation-tokens.sh   kafka-leader-election.sh    kafka-reassign-partitions.sh   kafka-storage.sh               kafka-verifiable-producer.sh     zookeeper-server-stop.sh
connect-standalone.sh          kafka-console-consumer.sh  kafka-delete-records.sh      kafka-log-dirs.sh        kafka-replica-verification.sh  kafka-streams-application-reset.sh  trogdor.sh                zookeeper-shell.sh
kafka-acls.sh              kafka-console-producer.sh  kafka-dump-log.sh          kafka-metadata-shell.sh    kafka-run-class.sh           kafka-topics.sh               windows
kafka-broker-api-versions.sh  kafka-consumer-groups.sh     kafka-features.sh          kafka-mirror-maker.sh    kafka-server-start.sh           kafka-transactions.sh           zookeeper-security-migration.sh
$

先看下有沒有創建courseware-log的topic,如果沒有就創建一個

$ ./kafka-topics.sh --bootstrap-server kafka:9092 --list
__consumer_offsets
courseware-log

# 沒有就創建,創建的命令。最新版的kafka不需要指定zookeeper
./kafka-topics.sh  --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic courseware-log

# 建錯了刪除用這個
./kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic courseware-log

# 發佈消息用這個
./kafka-console-producer.sh --broker-list kafka:9092 --topic courseware-log

# 消費用這個
./kafka-console-consumer.sh --bootstrap-server kafka:9092  --topic courseware-log --from-beginning

我們執行消費腳本看下日誌會不會過來。
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707248.png

現在還沒有日誌進來,我們請求一下接口讓接口報錯,可以看到日誌開始消費了
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707264.png

到這裏日誌已經流轉到kafka中了。

下面是go-stash從kafka拉取日誌處理並保存到elasticsearch的流程:

go-stash需要引入配置文件go-stash.yml,內容如下:

參數可參考github go-stash

Clusters:
  - Input:
      Kafka:
        Name: go-stash
        Brokers:
          - "kafka:9092"
        Topics:
          - courseware-log
        Group: pro
        Consumers: 16
    Filters:
      - Action: drop
        Conditions:
          - Key: k8s_container_name
            Value: "-rpc"
            Type: contains
          - Key: level
            Value: info
            Type: match
            Op: and
      - Action: remove_field
        Fields:
          # - message
          - _source
          - _type
          - _score
          - _id
          - "@version"
          - topic
          - index
          - beat
          - docker_container
          - offset
          - prospector
          - source
          - stream
          - "@metadata"
      - Action: transfer
        Field: message
        Target: data
    Output:
      ElasticSearch:
        Hosts:
          - "http://elasticsearch:9200"
        Index: "courseware-{{yyyy-MM-dd}}"

問題:

但是這裏mac上又遇到一個問題就是對接go-stash時mac上的docker中會報錯

2022/09/08 21:51:10 {"@timestamp":"2022-09-08T21:51:10.346+08:00","level":"error","content":"cpu_linux.go:29 open cpuacct.usage_percpu: no such file or directory"}

具體可以看這裏https://github.com/zeromicro/... 還沒有找到好的解決辦法。

後續:

之後又重啓了下docker發現問題解決了,同步到es生效了。

接下來我們請求下用户服務的接口,到es查看,索引已經創建,錯誤信息已經寫進去了
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707292.png
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707306.png

然後我們訪問http://127.0.0.1:5601/進到kibana後台,點擊Discover,並創建索引
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707320.png
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707333.png

搜索到課件服務的索引後點擊下一步
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707354.png

選擇@timestamp,點擊創建
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707368.png

重新點擊Discover之後可以看到課件的日誌服務創建完成
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662707382.png

user avatar yinggaozhen 頭像 boywus 頭像
2 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.