Files
eiblog/vendor/github.com/qiniu/api.v7/kodocli/resumable.go
2017-07-11 23:50:01 +08:00

329 lines
9.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package kodocli
import (
. "context"
"encoding/base64"
"encoding/json"
"errors"
"io"
"os"
"strings"
"sync"
"github.com/qiniu/x/xlog.v7"
)
// ----------------------------------------------------------
var (
ErrInvalidPutProgress = errors.New("invalid put progress")
ErrPutFailed = errors.New("resumable put failed")
ErrUnmatchedChecksum = errors.New("unmatched checksum")
ErrBadToken = errors.New("invalid token")
)
const (
InvalidCtx = 701 // UP: 无效的上下文(bput)可能情况Ctx非法或者已经被淘汰太久未使用
)
const (
defaultWorkers = 4
defaultChunkSize = 1 * 1024 * 1024 // 1M
defaultTryTimes = 3
)
type Settings struct {
TaskQsize int // 可选。任务队列大小。为 0 表示取 Workers * 4。
Workers int // 并行 Goroutine 数目。
ChunkSize int // 默认的Chunk大小不设定则为1M
TryTimes int // 默认的尝试次数不设定则为3
}
var settings = Settings{
TaskQsize: defaultWorkers * 4,
Workers: defaultWorkers,
ChunkSize: defaultChunkSize,
TryTimes: defaultTryTimes,
}
func SetSettings(v *Settings) {
settings = *v
if settings.Workers == 0 {
settings.Workers = defaultWorkers
}
if settings.TaskQsize == 0 {
settings.TaskQsize = settings.Workers * 4
}
if settings.ChunkSize == 0 {
settings.ChunkSize = defaultChunkSize
}
if settings.TryTimes == 0 {
settings.TryTimes = defaultTryTimes
}
}
// ----------------------------------------------------------
var tasks chan func()
func worker(tasks chan func()) {
for {
task := <-tasks
task()
}
}
func initWorkers() {
tasks = make(chan func(), settings.TaskQsize)
for i := 0; i < settings.Workers; i++ {
go worker(tasks)
}
}
func notifyNil(blkIdx int, blkSize int, ret *BlkputRet) {}
func notifyErrNil(blkIdx int, blkSize int, err error) {}
// ----------------------------------------------------------
const (
blockBits = 22
blockMask = (1 << blockBits) - 1
)
func BlockCount(fsize int64) int {
return int((fsize + blockMask) >> blockBits)
}
// ----------------------------------------------------------
type BlkputRet struct {
Ctx string `json:"ctx"`
Checksum string `json:"checksum"`
Crc32 uint32 `json:"crc32"`
Offset uint32 `json:"offset"`
Host string `json:"host"`
}
type RputExtra struct {
Params map[string]string // 可选。用户自定义参数,以"x:"开头 否则忽略
MimeType string // 可选。
ChunkSize int // 可选。每次上传的Chunk大小
TryTimes int // 可选。尝试次数
Progresses []BlkputRet // 可选。上传进度
Notify func(blkIdx int, blkSize int, ret *BlkputRet) // 可选。进度提示注意多个block是并行传输的
NotifyErr func(blkIdx int, blkSize int, err error)
}
var once sync.Once
// ----------------------------------------------------------
type Policy struct {
Scope string `json:"scope"`
UpHosts []string `json:"uphosts"`
}
func unmarshal(uptoken string, uptokenPolicy *Policy) (err error) {
parts := strings.Split(uptoken, ":")
if len(parts) != 3 {
err = ErrBadToken
return
}
b, err := base64.URLEncoding.DecodeString(parts[2])
if err != nil {
return err
}
return json.Unmarshal(b, uptokenPolicy)
}
func (p Uploader) getUpHostFromToken(uptoken string) (uphosts []string, err error) {
if len(p.UpHosts) != 0 {
uphosts = p.UpHosts
return
}
ak := strings.Split(uptoken, ":")[0]
uptokenPolicy := Policy{}
err = unmarshal(uptoken, &uptokenPolicy)
if err != nil {
return
}
if len(uptokenPolicy.UpHosts) == 0 {
bucketName := strings.Split(uptokenPolicy.Scope, ":")[0]
bucketInfo, err1 := p.ApiCli.GetBucketInfo(ak, bucketName)
if err1 != nil {
err = err1
return
}
uphosts = bucketInfo.UpHosts
} else {
uphosts = uptokenPolicy.UpHosts
}
return
}
// 上传一个文件,支持断点续传和分块上传。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// key 是要上传的文件访问路径。比如:"foo/bar.jpg"。注意我们建议 key 不要以 '/' 开头。另外key 为空字符串是合法的。
// f 是文件内容的访问接口。考虑到需要支持分块上传和断点续传,要的是 io.ReaderAt 接口,而不是 io.Reader。
// fsize 是要上传的文件大小。
// extra 是上传的一些可选项。详细见 RputExtra 结构的描述。
//
func (p Uploader) Rput(
ctx Context, ret interface{}, uptoken string,
key string, f io.ReaderAt, fsize int64, extra *RputExtra) error {
return p.rput(ctx, ret, uptoken, key, true, f, fsize, extra)
}
// 上传一个文件支持断点续传和分块上传。文件的访问路径key自动生成。
// 如果 uptoken 中设置了 SaveKey那么按 SaveKey 要求的规则生成 key否则自动以文件的 hash 做 key。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// f 是文件内容的访问接口。考虑到需要支持分块上传和断点续传,要的是 io.ReaderAt 接口,而不是 io.Reader。
// fsize 是要上传的文件大小。
// extra 是上传的一些可选项。详细见 RputExtra 结构的描述。
//
func (p Uploader) RputWithoutKey(
ctx Context, ret interface{}, uptoken string, f io.ReaderAt, fsize int64, extra *RputExtra) error {
return p.rput(ctx, ret, uptoken, "", false, f, fsize, extra)
}
// 上传一个文件,支持断点续传和分块上传。
// 和 Rput 不同的只是一个通过提供文件路径来访问文件内容,一个通过 io.ReaderAt 来访问。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// key 是要上传的文件访问路径。比如:"foo/bar.jpg"。注意我们建议 key 不要以 '/' 开头。另外key 为空字符串是合法的。
// localFile 是要上传的文件的本地路径。
// extra 是上传的一些可选项。详细见 RputExtra 结构的描述。
//
func (p Uploader) RputFile(
ctx Context, ret interface{}, uptoken, key, localFile string, extra *RputExtra) (err error) {
return p.rputFile(ctx, ret, uptoken, key, true, localFile, extra)
}
// 上传一个文件支持断点续传和分块上传。文件的访问路径key自动生成。
// 如果 uptoken 中设置了 SaveKey那么按 SaveKey 要求的规则生成 key否则自动以文件的 hash 做 key。
// 和 RputWithoutKey 不同的只是一个通过提供文件路径来访问文件内容,一个通过 io.ReaderAt 来访问。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// localFile 是要上传的文件的本地路径。
// extra 是上传的一些可选项。详细见 RputExtra 结构的描述。
//
func (p Uploader) RputFileWithoutKey(
ctx Context, ret interface{}, uptoken, localFile string, extra *RputExtra) (err error) {
return p.rputFile(ctx, ret, uptoken, "", false, localFile, extra)
}
// ----------------------------------------------------------
func (p Uploader) rput(
ctx Context, ret interface{}, uptoken string,
key string, hasKey bool, f io.ReaderAt, fsize int64, extra *RputExtra) error {
once.Do(initWorkers)
log := xlog.NewWith(ctx)
blockCnt := BlockCount(fsize)
if extra == nil {
extra = new(RputExtra)
}
if extra.Progresses == nil {
extra.Progresses = make([]BlkputRet, blockCnt)
} else if len(extra.Progresses) != blockCnt {
return ErrInvalidPutProgress
}
if extra.ChunkSize == 0 {
extra.ChunkSize = settings.ChunkSize
}
if extra.TryTimes == 0 {
extra.TryTimes = settings.TryTimes
}
if extra.Notify == nil {
extra.Notify = notifyNil
}
if extra.NotifyErr == nil {
extra.NotifyErr = notifyErrNil
}
uphosts, err := p.getUpHostFromToken(uptoken)
if err != nil {
return err
}
var wg sync.WaitGroup
wg.Add(blockCnt)
last := blockCnt - 1
blkSize := 1 << blockBits
nfails := 0
p.Conn.Client = newUptokenClient(uptoken, p.Conn.Transport)
for i := 0; i < blockCnt; i++ {
blkIdx := i
blkSize1 := blkSize
if i == last {
offbase := int64(blkIdx) << blockBits
blkSize1 = int(fsize - offbase)
}
task := func() {
defer wg.Done()
tryTimes := extra.TryTimes
lzRetry:
err := p.resumableBput(ctx, uphosts, &extra.Progresses[blkIdx], f, blkIdx, blkSize1, extra)
if err != nil {
if tryTimes > 1 {
tryTimes--
log.Info("resumable.Put retrying ...", blkIdx, "reason:", err)
goto lzRetry
}
log.Warn("resumable.Put", blkIdx, "failed:", err)
extra.NotifyErr(blkIdx, blkSize1, err)
nfails++
}
}
tasks <- task
}
wg.Wait()
if nfails != 0 {
return ErrPutFailed
}
return p.mkfile(ctx, uphosts, ret, key, hasKey, fsize, extra)
}
func (p Uploader) rputFile(
ctx Context, ret interface{}, uptoken string,
key string, hasKey bool, localFile string, extra *RputExtra) (err error) {
f, err := os.Open(localFile)
if err != nil {
return
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return
}
return p.rput(ctx, ret, uptoken, key, hasKey, f, fi.Size(), extra)
}
// ----------------------------------------------------------