前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang 源码分析:mc,minio-go

golang 源码分析:mc,minio-go

作者头像
golangLeetcode
发布2022-08-03 13:51:35
1.3K0
发布2022-08-03 13:51:35
举报

对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。容量和处理能力弹性扩展,多种存储类型供选择,全面优化存储成本。Minio 除了直接作为对象存储使用,还可以作为云上对象存储服务的网关层,无缝对接到 Amazon S3、MicroSoft Azure。 在学习minio的源码之前,先阅读下minio的客户端mc和golang sdk minio-go

代码语言:javascript
复制
https://github.com/minio/minio-go
https://github.com/minio/mc

1,MC

mc是在 minio-go的基础上做了命令行的包装,常用的命令如下:

代码语言:javascript
复制
ls       列出文件和文件夹。
mb       创建一个存储桶或一个文件夹。
cat      显示文件和对象内容。
pipe     将一个STDIN重定向到一个对象或者文件或者STDOUT。
share    生成用于共享的URL。
cp       拷贝文件和对象。
mirror   给存储桶和文件夹做镜像。
find     基于参数查找文件。
diff     对两个文件夹或者存储桶比较差异。
rm       删除文件和对象。
events   管理对象通知。
watch    监听文件和对象的事件。
policy   管理访问策略。
session  为cp命令管理保存的会话。
config   管理mc配置文件。
update   检查软件更新。
version  输出版本信息。

当然,入口函数还是main.go:

代码语言:javascript
复制
func main() {
  mc.Main(os.Args)
}

对应的每一个命令的实现在cmd目录下,由于mc的命令有自己特殊的模板,所以它没有用常用的cobra,而是自己定义了一套minio/cli,首先我们看下Main函数,它定义在cmd/main.go中

代码语言:javascript
复制
func Main(args []string) 
      mainComplete()
      defer profile.Start(profile.CPUProfile, profile.ProfilePath(mustGetProfileDir())).Stop()
      probe.Init()
      if err := registerApp(appName).Run(args); err != nil 

mainComplete定义在cmd/auto-complete.go

代码语言:javascript
复制
func mainComplete() error 
   for _, cmd := range appCmds {
    if cmd.Hidden {
      continue
    }
    complCmds[cmd.Name] = cmdToCompleteCmd(cmd, "")
  }
   mcComplete := complete.Command{
    Sub:         complCmds,
    GlobalFlags: complFlags,
  }
  complete.New(filepath.Base(os.Args[0]), mcComplete).Run()

它把appCmds里面的命令注册成mc的子命令,appCmds是子命令列表,定义在cmd/main.go中:

代码语言:javascript
复制
var appCmds = []cli.Command{
  aliasCmd,
  lsCmd,
  mbCmd,
  rbCmd,
  cpCmd,
  mirrorCmd,
  catCmd,
  headCmd,
  pipeCmd,
  shareCmd,
  findCmd,
  sqlCmd,
  statCmd,
  mvCmd,
  treeCmd,
  duCmd,
  retentionCmd,
  legalHoldCmd,
  diffCmd,
  rmCmd,
  versionCmd,
  ilmCmd,
  encryptCmd,
  eventCmd,
  watchCmd,
  undoCmd,
  anonymousCmd,
  policyCmd,
  tagCmd,
  replicateCmd,
  adminCmd,
  configCmd,
  updateCmd,
}

probe定义在pkg/probe/probe.go

代码语言:javascript
复制
    func Init() {
      _, file, _, _ := runtime.Caller(1)
      rootPath = filepath.Dir(file)

其中的Run命令是minio/cli框架的接口,分别定义在

minio/cli@v1.22.0/app.go

代码语言:javascript
复制
func (a *App) Run(arguments []string) (err error) 
      a.Setup()
      c := a.Command(name)
     if c != nil {
      return c.Run(context)
    }

minio/cli@v1.22.0/command.go

代码语言:javascript
复制
func (c Command) Run(ctx *Context) (err error) 
      err = HandleAction(c.Action, context)

下面以tree命令为例,看下实现的细节: cmd/tree-main.go

代码语言:javascript
复制
var treeCmd = cli.Command{
  Name:         "tree",
  Usage:        "list buckets and objects in a tree format",
  Action:       mainTree,
  OnUsageError: onUsageError,
  Before:       setGlobalsFromContext,
  Flags:        append(treeFlags, globalFlags...),
  CustomHelpTemplate

对应执行的命令是mainTree

代码语言:javascript
复制
func mainTree(cliCtx *cli.Context) error
      args, depth, includeFiles, timeRef := parseTreeSyntax(ctx, cliCtx)
      if e := doTree(ctx, targetURL, timeRef, 1, false, "", depth, includeFiles); e != nil
      clnt, err := newClientFromAlias(targetAlias, targetURL)
      e := doList(ctx, clnt, true, false, false, timeRef, false);

在doTree方法里初始化了一个client,调用了对应的list接口:

代码语言:javascript
复制
func doTree(ctx context.Context, url string, timeRef time.Time, level int, leaf bool, branchString string, depth int, includeFiles bool) error
      clnt, err := newClientFromAlias(targetAlias, targetURL)
      show := func(end bool) error 
        for content := range clnt.List(ctx, ListOptions{Recursive: false, TimeRef: timeRef, ShowDir: DirFirst}) 

client是一个客户端的接口cmd/client.go

代码语言:javascript
复制
type Client interface {
  // Common operations
  Stat(ctx context.Context, opts StatOptions) (content *ClientContent, err *probe.Error)
  List(ctx context.Context, opts ListOptions) <-chan *ClientContent

对应有两个具体实现,一个是本地文件系统,一个s3:

cmd/client-fs.go

代码语言:javascript
复制
func (f *fsClient) List(ctx context.Context, opts ListOptions) <-chan *ClientContent
      go f.listRecursiveInRoutine(contentCh, opts.WithMetadata)
      go f.listDirOpt(contentCh, opts.Incomplete, opts.WithMetadata, opts.ShowDir)
      go f.listInRoutine(contentCh, opts.WithMetadata)

通过walk方法递归调用visit方法做树状渲染展示:

代码语言:javascript
复制
func (f *fsClient) listRecursiveInRoutine(contentCh chan *ClientContent, isMetadata bool) 
      visitFS := func(fp string, fi os.FileInfo, e error) error
      e := xfilepath.Walk(dirName, visitFS)

cmd/client-s3.go

代码语言:javascript
复制
func (c *S3Client) List(ctx context.Context, opts ListOptions) <-chan *ClientContent
      c.versionedList(ctx, contentCh, opts)
      c.unversionedList(ctx, contentCh, opts)
代码语言:javascript
复制
func (c *S3Client) versionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) 
      b, o := c.url2BucketAndObject()
代码语言:javascript
复制
func (c *S3Client) url2BucketAndObject() (bucketName, objectName string) 
      buckets, err := c.api.ListBuckets(ctx)
      for _, bucket := range buckets {
      contentCh <- c.bucketInfo2ClientContent(bucket)
      for objectVersion := range c.listVersions(ctx, bucket.Name, "",
        opts.Recursive, opts.TimeRef, opts.WithOlderVersions, opts.WithDeleteMarkers)
      c.unversionedList(ctx, contentCh, opts)
代码语言:javascript
复制
func (c *S3Client) unversionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) 
      c.listIncompleteRecursiveInRoutine(ctx, contentCh, opts)
      c.listIncompleteInRoutine(ctx, contentCh, opts)
      c.listRecursiveInRoutine(ctx, contentCh, opts)
      c.listInRoutine(ctx, contentCh, opts)
代码语言:javascript
复制
func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
      buckets, err := c.api.ListBuckets(ctx)
      for _, bucket := range buckets {
      for object := range c.api.ListIncompleteUploads(ctx, bucket.Name, o, isRecursive)
代码语言:javascript
复制
func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
      buckets, err := c.api.ListBuckets(ctx)
            for object := range c.listObjectWrapper(ctx, bucket.Name, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1)
代码语言:javascript
复制
func (c *S3Client) listObjectWrapper(ctx context.Context, bucket, object string, isRecursive bool, timeRef time.Time, withVersions, withDeleteMarkers bool, metadata bool, maxKeys int) <-chan minio.ObjectInfo 
      return c.listVersions(ctx, bucket, object, isRecursive, timeRef, withVersions, withDeleteMarkers)
       c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, UseV1: true, MaxKeys: maxKeys})
      c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, WithMetadata: metadata, MaxKeys: maxKeys})

最终都是调用了SDK中的对应方法:

minio/minio-go/v7@v7.0.16-0.20211108161804-a7a36ee131df/api-list.go

代码语言:javascript
复制
func (c *Client) ListObjects(ctx context.Context, bucketName string, opts ListObjectsOptions) <-chan ObjectInfo
代码语言:javascript
复制
func (c *Client) ListIncompleteUploads(ctx context.Context, bucketName, objectPrefix string, recursive bool) <-chan ObjectMultipartInfo {
     return c.listIncompleteUploads(ctx, bucketName, objectPrefix, recursive)
}

我们看下另外一个ls命令,实现也是类似的:

cmd/ls-main.go

代码语言:javascript
复制
var lsCmd = cli.Command{
  Name:         "ls",
  Usage:        "list buckets and objects",
  Action:       mainList,
  OnUsageError: onUsageError,
  Before:       setGlobalsFromContext,
  Flags:        append(lsFlags, globalFlags...),
  CustomHelpTemplate:  
代码语言:javascript
复制
func mainList(cliCtx *cli.Context) error
      if e := doList(ctx, clnt, isRecursive, isIncomplete, isSummary, timeRef, withOlderVersions); e != nil 

cmd/ls.go

代码语言:javascript
复制
func doList(ctx context.Context, clnt Client, isRecursive, isIncomplete, isSummary bool, timeRef time.Time, withOlderVersions bool) error
        for content := range clnt.List(ctx, ListOptions{
    Recursive:         isRecursive,
    Incomplete:        isIncomplete,
    TimeRef:           timeRef,
    WithOlderVersions: withOlderVersions || !timeRef.IsZero(),
    WithDeleteMarkers: true,
    ShowDir:           DirNone,
  })

2,SDK:minio-go

首先我们看下sdk是如何使用的:

1,创建client对象:

代码语言:javascript
复制
minioClient, err := minio.New(endpoint, &minio.Options{
    Creds:  credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
    Secure: useSSL,
  })

2,创建bucket,或者确认bucket是否存在:

代码语言:javascript
复制
err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: location})
代码语言:javascript
复制
exists, errBucketExists := minioClient.BucketExists(ctx, bucketName)

3,创建文件

代码语言:javascript
复制
info, err := minioClient.FPutObject(ctx, bucketName, objectName, filePath, minio.PutObjectOptions{ContentType: contentType})

client对象定义在api.go

代码语言:javascript
复制
type Client struct {
  ///  Standard options.


  // Parsed endpoint url provided by the user.
  endpointURL *url.URL


  // Holds various credential providers.
  credsProvider *credentials.Credentials


  // Custom signerType value overrides all credentials.
  overrideSignerType credentials.SignatureType


  // User supplied.
  appInfo struct {
    appName    string
    appVersion string
  }


  // Indicate whether we are using https or not
  secure bool


  // Needs allocation.
  httpClient     *http.Client
  bucketLocCache *bucketLocationCache


  // Advanced functionality.
  isTraceEnabled  bool
  traceErrorsOnly bool
  traceOutput     io.Writer


  // S3 specific accelerated endpoint.
  s3AccelerateEndpoint string


  // Region endpoint
  region string


  // Random seed.
  random *rand.Rand


  // lookup indicates type of url lookup supported by server. If not specified,
  // default to Auto.
  lookup BucketLookupType


  // Factory for MD5 hash functions.
  md5Hasher    func() md5simd.Hasher
  sha256Hasher func() md5simd.Hasher


  healthStatus int32
}  
代码语言:javascript
复制
func New(endpoint string, opts *Options) (*Client, error) 

这个文件下还定义了一个executeMethod方法,这个方法是所有http请求的入口:

代码语言:javascript
复制
func (c *Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error) 
      bodyCloser, ok := metadata.contentBody.(io.Closer)
      for range c.newRetryTimer(retryCtx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter) 
      req, err = c.newRequest(ctx, method, metadata)
      res, err = c.do(req)

do方法简单包装了http client的do方法:

代码语言:javascript
复制
func (c *Client) do(req *http.Request) (resp *http.Response, err error) 

      resp, err = c.httpClient.Do(req)

api-put-bucket.go里定义了创建bucket的方法:

代码语言:javascript
复制
func (c *Client) MakeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error) {
  return c.makeBucket(ctx, bucketName, opts)
}
代码语言:javascript
复制
func (c *Client) makeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error)

      err = c.doMakeBucket(ctx, bucketName, opts.Region, opts.ObjectLocking)

最终调用了上述executeMethod方法:

代码语言:javascript
复制
func (c *Client) doMakeBucket(ctx context.Context, bucketName string, location string, objectLockEnabled bool) (err error) 
      createBucketConfigBytes, err = xml.Marshal(createBucketConfig)
      reqMetadata.contentMD5Base64 = sumMD5Base64(createBucketConfigBytes)
      resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)

BucketExists方法定义在api-stat.go

代码语言:javascript
复制
func (c *Client) BucketExists(ctx context.Context, bucketName string) (bool, error)
      resp, err := c.executeMethod(ctx, http.MethodHead, requestMetadata{
    bucketName:       bucketName,
    contentSHA256Hex: emptySHA256Hex,
  })

FPutObject定义在:api-put-object-file-context.go

代码语言:javascript
复制
func (c *Client) FPutObject(ctx context.Context, bucketName, objectName, filePath string, opts PutObjectOptions) (info UploadInfo, err error) 
      fileReader, err := os.Open(filePath)
      fileStat, err := fileReader.Stat()
      fileSize := fileStat.Size()
      return c.PutObject(ctx, bucketName, objectName, fileReader, fileSize, opts)

api-put-object.go

代码语言:javascript
复制
func (c *Client) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
  opts PutObjectOptions) (info UploadInfo, err error)
      return c.putObjectCommon(ctx, bucketName, objectName, reader, objectSize, opts)

根据大小和url的类型确定上传方式,可以整体也可以分片,还可以流式

代码语言:javascript
复制
func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error)

      if size > int64(maxMultipartPutObjectSize) 
        if s3utils.IsGoogleEndpoint(*c.endpointURL) {
    return c.putObject(ctx, bucketName, objectName, reader, size, opts)
        if c.overrideSignerType.IsV2() {
    if size >= 0 && size < int64(partSize) || opts.DisableMultipart {
      return c.putObject(ctx, bucketName, objectName, reader, size, opts)
    }
    return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
  }
      return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)

这里定义了一个常量,最大允许5T大小

api-put-object-streaming.go

代码语言:javascript
复制
func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) 
      return c.putObjectDo(ctx, bucketName, objectName, readSeeker, md5Base64, "", size, opts)
代码语言:javascript
复制
func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error)
    resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)

api-put-object-multipart.go

代码语言:javascript
复制
func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
  opts PutObjectOptions) (info UploadInfo, err error) 
      info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
      return c.putObject(ctx, bucketName, objectName, reader, size, opts)

分片上传根据分片大小,计算出分片数目,然后创建上传的id,最后合并分片:

代码语言:javascript
复制
func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) 
      totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
      uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
      for partNumber <= totalPartsCount {
          objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
      md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption)
      uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
代码语言:javascript
复制
func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
  partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error) 
      resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
代码语言:javascript
复制
func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
  complete completeMultipartUpload, opts PutObjectOptions) (UploadInfo, error) 
      resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)

除了基本的上传,在examples下面还定义了一些其他的接口的使用例子:

examples/minio/listen-notification.go

代码语言:javascript
复制
minioClient.ListenNotification(context.Background(), "PREFIX", "SUFFIX", []string{
    "s3:BucketCreated:*",
    "s3:BucketRemoved:*",
    "s3:ObjectCreated:*",
    "s3:ObjectAccessed:*",
    "s3:ObjectRemoved:*",
  }) 

examples/minio/listenbucketnotification.go

代码语言:javascript
复制
minioClient.ListenBucketNotification(context.Background(), "YOUR-BUCKET", "PREFIX", "SUFFIX", []string{
    "s3:ObjectCreated:*",
    "s3:ObjectAccessed:*",
    "s3:ObjectRemoved:*",
  }) 

examples/minio/putobjectsnowball.go

代码语言:javascript
复制
minioClient.ListObjects(context.Background(), YOURBUCKET, lopts)

examples/minio/getbucketreplicationmetrics.go

代码语言:javascript
复制
s3Client.TraceOn(os.Stderr)
m, err := s3Client.GetBucketReplicationMetrics(context.Background(), "bucket")

api-bucket-notification.go

代码语言:javascript
复制
func (c *Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info 
代码语言:javascript
复制
func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info 
        resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
        bucketName:       bucketName,
        queryValues:      urlValues,
        contentSHA256Hex: emptySHA256Hex,
      })

以上就是mc和sdk的源码,整体来说就是对minio的接口做了一层httpclient 的封装,加了一些参数校验的逻辑。

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-11-13,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
命令行工具
腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com