Skip to content

Support multipart-put benchmarking #383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,40 @@ Throughput, split into 9 x 1s:
warp: Cleanup done.
```

## MULTIPART PUT

Multipart put benchmark tests upload speed of parts. It creates multipart upload, uploads `--parts` parts of
`--part.size` size each and completes multipart upload when all parts are uploaded.

Multipart put test runs `--concurrent` separate multipart uploads. Each of those uploads split up to
`--part.concurrent` concurrent upload threads. So total concurrency is a `--concurrent`
multiplied by `--part.concurrent`.

```
λ warp multipart-put --parts 100 --part.size 5MiB
╭─────────────────────────────────╮
│ WARP S3 Benchmark Tool by MinIO │
╰─────────────────────────────────╯

Benchmarking: Press 'q' to abort benchmark and print partial results...

λ █████████████████████████████████████████████████████████████████████████ 100%

Reqs: 15867, Errs:0, Objs:15867, Bytes: 1983.4MiB
- PUTPART Average: 266 Obj/s, 33.2MiB/s; Current 260 Obj/s, 32.5MiB/s, 1193.7 ms/req

Report: PUTPART. Concurrency: 400. Ran: 58s
* Average: 33.36 MiB/s, 266.85 obj/s
* Reqs: Avg: 1262.5ms, 50%: 935.3ms, 90%: 2773.8ms, 99%: 4395.2ms, Fastest: 53.6ms, Slowest: 6976.4ms, StdDev: 1027.5ms

Throughput, split into 58 x 1s:
* Fastest: 37.9MiB/s, 302.87 obj/s
* 50% Median: 34.3MiB/s, 274.10 obj/s
* Slowest: 19.8MiB/s, 158.41 obj/s


Cleanup Done
```

## ZIP

Expand Down
1 change: 1 addition & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func init() {
versionedCmd,
retentionCmd,
multipartCmd,
multipartPutCmd,
zipCmd,
snowballCmd,
fanoutCmd,
Expand Down
86 changes: 86 additions & 0 deletions cli/multipart_put.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package cli

import (
"github.com/minio/cli"
"github.com/minio/pkg/v3/console"
"github.com/minio/warp/pkg/bench"
)

var multipartPutFlags = []cli.Flag{
cli.IntFlag{
Name: "parts",
Value: 100,
Usage: "Number of parts to upload for each multipart upload",
},
cli.StringFlag{
Name: "part.size",
Value: "5MiB",
Usage: "Size of each part. Can be a number or MiB/GiB.",
},
cli.IntFlag{
Name: "part.concurrent",
Value: 20,
Usage: "Run this many concurrent operations per each multipart upload. Must not exceed a number of parts.",
},
}

var MultiPartPutCombinedFlags = combineFlags(globalFlags, ioFlags, multipartPutFlags, genFlags, benchFlags, analyzeFlags)

// MultipartPut command
var multipartPutCmd = cli.Command{
Name: "multipart-put",
Usage: "benchmark multipart upload",
Action: mainMutipartPut,
Before: setGlobalsFromContext,
Flags: MultiPartPutCombinedFlags,
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}

USAGE:
{{.HelpName}} [FLAGS]
-> see https://github.com/minio/warp#multipart-put

FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}`,
}

// mainMutipartPut is the entry point for multipart-put command
func mainMutipartPut(ctx *cli.Context) error {
checkMultipartPutSyntax(ctx)

b := &bench.MultipartPut{
Common: getCommon(ctx, newGenSource(ctx, "part.size")),
PartsNumber: ctx.Int("parts"),
PartsConcurrency: ctx.Int("part.concurrent"),
}
return runBench(ctx, b)
}

func checkMultipartPutSyntax(ctx *cli.Context) {
if ctx.NArg() > 0 {
console.Fatal("Command takes no arguments")
}
if ctx.Bool("disable-multipart") {
console.Fatal("cannot disable multipart for multipart-put test")
}

if ctx.Int("parts") > 10000 {
console.Fatal("parts can't be more than 10000")
}
if ctx.Int("parts") <= 0 {
console.Fatal("parts must be at least 1")
}

if ctx.Int("part.concurrent") > ctx.Int("parts") {
console.Fatal("part.concurrent can't be more than parts")
}

sz, err := toSize(ctx.String("part.size"))
if err != nil {
console.Fatal("error parsing part.size:", err)
}
if sz <= 0 {
console.Fatal("part.size must be at least 1")
}
}
180 changes: 180 additions & 0 deletions pkg/bench/multipart_put.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package bench

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/minio/minio-go/v7"
"golang.org/x/sync/errgroup"
)

// MultipartPut benchmarks multipart upload speed.
type MultipartPut struct {
Common

PartsNumber int
PartsConcurrency int
}

// Prepare for the benchmark run
func (g *MultipartPut) Prepare(ctx context.Context) error {
return g.createEmptyBucket(ctx)
}

// Start will execute the main benchmark.
// Operations should begin executing when the start channel is closed.
func (g *MultipartPut) Start(ctx context.Context, wait chan struct{}) error {
eg, ctx := errgroup.WithContext(ctx)
c := g.Collector
if g.AutoTermDur > 0 {
ctx = c.AutoTerm(ctx, http.MethodPut, g.AutoTermScale, autoTermCheck, autoTermSamples, g.AutoTermDur)
}

for i := 0; i < g.Concurrency; i++ {
thread := uint16(i)
eg.Go(func() error {
<-wait

for ctx.Err() == nil {
objectName := g.Source().Object().Name

uploadID, err := g.createMultupartUpload(ctx, objectName)
if errors.Is(err, context.Canceled) {
return nil
}
if err != nil {
g.Error("create multipart upload error:", err)
continue
}

err = g.uploadParts(ctx, thread, objectName, uploadID)
if errors.Is(err, context.Canceled) {
return nil
}
if err != nil {
g.Error("upload parts error:", err)
continue
}

err = g.completeMultipartUpload(ctx, objectName, uploadID)
if err != nil {
g.Error("complete multipart upload")
}
}
return nil
})
}
return eg.Wait()
}

// Cleanup up after the benchmark run.
func (g *MultipartPut) Cleanup(ctx context.Context) {
g.deleteAllInBucket(ctx, "")
}

func (g *MultipartPut) createMultupartUpload(ctx context.Context, objectName string) (string, error) {
if err := g.rpsLimit(ctx); err != nil {
return "", err
}

// Non-terminating context.
nonTerm := context.Background()

client, done := g.Client()
defer done()
c := minio.Core{Client: client}
return c.NewMultipartUpload(nonTerm, g.Bucket, objectName, g.PutOpts)
}

func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectName string, uploadID string) error {
partIdxCh := make(chan int, g.PartsNumber)
for i := 0; i < g.PartsNumber; i++ {
partIdxCh <- i + 1
}
close(partIdxCh)

eg, ctx := errgroup.WithContext(ctx)

// Non-terminating context.
nonTerm := context.Background()

for i := 0; i < g.PartsConcurrency; i++ {
eg.Go(func() error {
i := i
for ctx.Err() == nil {
var partIdx int
var ok bool
select {
case partIdx, ok = <-partIdxCh:
if !ok {
return nil
}
case <-ctx.Done():
continue
}

if err := g.rpsLimit(ctx); err != nil {
return err
}

obj := g.Source().Object()
client, done := g.Client()
defer done()
core := minio.Core{Client: client}
op := Operation{
OpType: "PUTPART",
Thread: thread*uint16(g.PartsConcurrency) + uint16(i),
Size: obj.Size,
File: obj.Name,
ObjPerOp: 1,
Endpoint: client.EndpointURL().String(),
}
if g.DiscardOutput {
op.File = ""
}

opts := minio.PutObjectPartOptions{
SSE: g.Common.PutOpts.ServerSideEncryption,
DisableContentSha256: g.PutOpts.DisableContentSha256,
}

op.Start = time.Now()
res, err := core.PutObjectPart(nonTerm, g.Bucket, objectName, uploadID, partIdx, obj.Reader, obj.Size, opts)
op.End = time.Now()
if err != nil {
err := fmt.Errorf("upload error: %w", err)
g.Error(err)
return err
}

if res.Size != obj.Size && op.Err == "" {
err := fmt.Sprint("short upload. want:", obj.Size, ", got:", res.Size)
if op.Err == "" {
op.Err = err
}
g.Error(err)
}

g.Collector.Receiver() <- op
}

return nil
})
}

return eg.Wait()
}

func (g *MultipartPut) completeMultipartUpload(_ context.Context, objectName string, uploadID string) error {
// Non-terminating context.
nonTerm := context.Background()

cl, done := g.Client()
c := minio.Core{Client: cl}
defer done()
_, err := c.CompleteMultipartUpload(nonTerm, g.Bucket, objectName, uploadID, nil, g.PutOpts)
return err
}
Loading