使用github的七牛SDK,配置名称Kodo->Qiniu

This commit is contained in:
deepzz0
2017-11-05 12:27:22 +08:00
parent c9fc0cc75a
commit 360204995d
429 changed files with 26939 additions and 14206 deletions

172
vendor/github.com/qiniu/api.v7/storage/base64_upload.go generated vendored Normal file
View File

@@ -0,0 +1,172 @@
package storage
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"hash/crc32"
"io"
"strconv"
"strings"
"github.com/qiniu/x/rpc.v7"
)
// Base64Uploader 表示一个Base64上传对象
type Base64Uploader struct {
client *rpc.Client
cfg *Config
}
// NewBase64Uploader 用来构建一个Base64上传的对象
func NewBase64Uploader(cfg *Config) *Base64Uploader {
if cfg == nil {
cfg = &Config{}
}
return &Base64Uploader{
client: &rpc.DefaultClient,
cfg: cfg,
}
}
// NewBase64UploaderEx 用来构建一个Base64上传的对象
func NewBase64UploaderEx(cfg *Config, client *rpc.Client) *Base64Uploader {
if cfg == nil {
cfg = &Config{}
}
if client == nil {
client = &rpc.DefaultClient
}
return &Base64Uploader{
client: client,
cfg: cfg,
}
}
// Base64PutExtra 为Base64上传的额外可选项
type Base64PutExtra struct {
// 可选,用户自定义参数,必须以 "x:" 开头。若不以x:开头,则忽略。
Params map[string]string
// 可选,当为 "" 时候,服务端自动判断。
MimeType string
}
// Put 用来以Base64方式上传一个文件
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 callbackUrl 或 returnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// key 是要上传的文件访问路径。比如:"foo/bar.jpg"。注意我们建议 key 不要以 '/' 开头。另外key 为空字符串是合法的。
// base64Data 是要上传的Base64数据一般为图片数据的Base64编码字符串
// extra 是上传的一些可选项可以指定为nil。详细见 Base64PutExtra 结构的描述。
//
func (p *Base64Uploader) Put(
ctx context.Context, ret interface{}, uptoken, key string, base64Data []byte, extra *Base64PutExtra) (err error) {
return p.put(ctx, ret, uptoken, key, true, base64Data, extra)
}
// PutWithoutKey 用来以Base64方式上传一个文件保存的文件名以文件的内容hash作为文件名
func (p *Base64Uploader) PutWithoutKey(
ctx context.Context, ret interface{}, uptoken string, base64Data []byte, extra *Base64PutExtra) (err error) {
return p.put(ctx, ret, uptoken, "", false, base64Data, extra)
}
func (p *Base64Uploader) put(
ctx context.Context, ret interface{}, uptoken, key string, hasKey bool, base64Data []byte, extra *Base64PutExtra) (err error) {
//get up host
ak, bucket, gErr := getAkBucketFromUploadToken(uptoken)
if gErr != nil {
err = gErr
return
}
var upHost string
upHost, err = p.upHost(ak, bucket)
if err != nil {
return
}
//set default extra
if extra == nil {
extra = &Base64PutExtra{}
}
//calc crc32
h := crc32.NewIEEE()
rawReader := base64.NewDecoder(base64.StdEncoding, bytes.NewReader(base64Data))
fsize, decodeErr := io.Copy(h, rawReader)
if decodeErr != nil {
err = fmt.Errorf("invalid base64 data, %s", decodeErr.Error())
return
}
fCrc32 := h.Sum32()
postPath := bytes.NewBufferString("/putb64")
//add fsize
postPath.WriteString("/")
postPath.WriteString(strconv.Itoa(int(fsize)))
//add key
if hasKey {
postPath.WriteString("/key/")
postPath.WriteString(base64.URLEncoding.EncodeToString([]byte(key)))
}
//add mimeType
if extra.MimeType != "" {
postPath.WriteString("/mimeType/")
postPath.WriteString(base64.URLEncoding.EncodeToString([]byte(extra.MimeType)))
}
//add crc32
postPath.WriteString("/crc32/")
postPath.WriteString(fmt.Sprintf("%d", fCrc32))
//add extra params
if len(extra.Params) > 0 {
for k, v := range extra.Params {
if strings.HasPrefix(k, "x:") && v != "" {
postPath.WriteString("/")
postPath.WriteString(k)
postPath.WriteString("/")
postPath.WriteString(base64.URLEncoding.EncodeToString([]byte(v)))
}
}
}
postURL := fmt.Sprintf("%s%s", upHost, postPath.String())
postClient := newUptokenClient(p.client, uptoken)
return postClient.CallWith(ctx, ret, "POST", postURL, "application/octet-stream",
bytes.NewReader(base64Data), len(base64Data))
}
func (p *Base64Uploader) upHost(ak, bucket string) (upHost string, err error) {
var zone *Zone
if p.cfg.Zone != nil {
zone = p.cfg.Zone
} else {
if v, zoneErr := GetZone(ak, bucket); zoneErr != nil {
err = zoneErr
return
} else {
zone = v
}
}
scheme := "http://"
if p.cfg.UseHTTPS {
scheme = "https://"
}
host := zone.SrcUpHosts[0]
if p.cfg.UseCdnDomains {
host = zone.CdnUpHosts[0]
}
upHost = fmt.Sprintf("%s%s", scheme, host)
return
}

File diff suppressed because one or more lines are too long

553
vendor/github.com/qiniu/api.v7/storage/bucket.go generated vendored Normal file
View File

@@ -0,0 +1,553 @@
package storage
import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"github.com/qiniu/api.v7/auth/qbox"
"github.com/qiniu/x/rpc.v7"
)
// 资源管理相关的默认域名
const (
DefaultRsHost = "rs.qiniu.com"
DefaultRsfHost = "rsf.qiniu.com"
DefaultAPIHost = "api.qiniu.com"
DefaultPubHost = "pu.qbox.me:10200"
)
// FileInfo 文件基本信息
type FileInfo struct {
Hash string `json:"hash"`
Fsize int64 `json:"fsize"`
PutTime int64 `json:"putTime"`
MimeType string `json:"mimeType"`
Type int `json:"type"`
}
func (f *FileInfo) String() string {
str := ""
str += fmt.Sprintf("Hash: %s\n", f.Hash)
str += fmt.Sprintf("Fsize: %d\n", f.Fsize)
str += fmt.Sprintf("PutTime: %d\n", f.PutTime)
str += fmt.Sprintf("MimeType: %s\n", f.MimeType)
str += fmt.Sprintf("Type: %d\n", f.Type)
return str
}
// FetchRet 资源抓取的返回值
type FetchRet struct {
Hash string `json:"hash"`
Fsize int64 `json:"fsize"`
MimeType string `json:"mimeType"`
Key string `json:"key"`
}
func (r *FetchRet) String() string {
str := ""
str += fmt.Sprintf("Key: %s\n", r.Key)
str += fmt.Sprintf("Hash: %s\n", r.Hash)
str += fmt.Sprintf("Fsize: %d\n", r.Fsize)
str += fmt.Sprintf("MimeType: %s\n", r.MimeType)
return str
}
// ListItem 为文件列举的返回值
type ListItem struct {
Key string `json:"key"`
Hash string `json:"hash"`
Fsize int64 `json:"fsize"`
PutTime int64 `json:"putTime"`
MimeType string `json:"mimeType"`
Type int `json:"type"`
EndUser string `json:"endUser"`
}
func (l *ListItem) String() string {
str := ""
str += fmt.Sprintf("Hash: %s\n", l.Hash)
str += fmt.Sprintf("Fsize: %d\n", l.Fsize)
str += fmt.Sprintf("PutTime: %d\n", l.PutTime)
str += fmt.Sprintf("MimeType: %s\n", l.MimeType)
str += fmt.Sprintf("Type: %d\n", l.Type)
str += fmt.Sprintf("EndUser: %s\n", l.EndUser)
return str
}
// BatchOpRet 为批量执行操作的返回值
// 批量操作支持 statcopydeletemovechgmchtypedeleteAfterDays几个操作
// 其中 stat 为获取文件的基本信息,如果文件存在则返回基本信息,如果文件不存在返回 error 。
// 其他的操作,如果成功,则返回 code不成功会同时返回 error 信息,可以根据 error 信息来判断问题所在。
type BatchOpRet struct {
Code int `json:"code,omitempty"`
Data struct {
Hash string `json:"hash"`
Fsize int64 `json:"fsize"`
PutTime int64 `json:"putTime"`
MimeType string `json:"mimeType"`
Type int `json:"type"`
Error string `json:"error"`
} `json:"data,omitempty"`
}
// BucketManager 提供了对资源进行管理的操作
type BucketManager struct {
client *rpc.Client
mac *qbox.Mac
cfg *Config
}
// NewBucketManager 用来构建一个新的资源管理对象
func NewBucketManager(mac *qbox.Mac, cfg *Config) *BucketManager {
if cfg == nil {
cfg = &Config{}
}
return &BucketManager{
client: NewClient(mac, nil),
mac: mac,
cfg: cfg,
}
}
// NewBucketManagerEx 用来构建一个新的资源管理对象
func NewBucketManagerEx(mac *qbox.Mac, cfg *Config, client *rpc.Client) *BucketManager {
if cfg == nil {
cfg = &Config{}
}
if client == nil {
client = NewClient(mac, nil)
}
return &BucketManager{
client: client,
mac: mac,
cfg: cfg,
}
}
// Buckets 用来获取空间列表,如果指定了 shared 参数为 true那么一同列表被授权访问的空间
func (m *BucketManager) Buckets(shared bool) (buckets []string, err error) {
ctx := context.TODO()
var reqHost string
scheme := "http://"
if m.cfg.UseHTTPS {
scheme = "https://"
}
reqHost = fmt.Sprintf("%s%s", scheme, DefaultRsHost)
reqURL := fmt.Sprintf("%s/buckets?shared=%v", reqHost, shared)
err = m.client.Call(ctx, &buckets, "POST", reqURL)
return
}
// Stat 用来获取一个文件的基本信息
func (m *BucketManager) Stat(bucket, key string) (info FileInfo, err error) {
ctx := context.TODO()
reqHost, reqErr := m.rsHost(bucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, URIStat(bucket, key))
err = m.client.Call(ctx, &info, "POST", reqURL)
return
}
// Delete 用来删除空间中的一个文件
func (m *BucketManager) Delete(bucket, key string) (err error) {
ctx := context.TODO()
reqHost, reqErr := m.rsHost(bucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, URIDelete(bucket, key))
err = m.client.Call(ctx, nil, "POST", reqURL)
return
}
// Copy 用来创建已有空间中的文件的一个新的副本
func (m *BucketManager) Copy(srcBucket, srcKey, destBucket, destKey string, force bool) (err error) {
ctx := context.TODO()
reqHost, reqErr := m.rsHost(srcBucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, URICopy(srcBucket, srcKey, destBucket, destKey, force))
err = m.client.Call(ctx, nil, "POST", reqURL)
return
}
// Move 用来将空间中的一个文件移动到新的空间或者重命名
func (m *BucketManager) Move(srcBucket, srcKey, destBucket, destKey string, force bool) (err error) {
ctx := context.TODO()
reqHost, reqErr := m.rsHost(srcBucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, URIMove(srcBucket, srcKey, destBucket, destKey, force))
err = m.client.Call(ctx, nil, "POST", reqURL)
return
}
// ChangeMime 用来更新文件的MimeType
func (m *BucketManager) ChangeMime(bucket, key, newMime string) (err error) {
ctx := context.TODO()
reqHost, reqErr := m.rsHost(bucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, URIChangeMime(bucket, key, newMime))
err = m.client.Call(ctx, nil, "POST", reqURL)
return
}
// ChangeType 用来更新文件的存储类型0表示普通存储1表示低频存储
func (m *BucketManager) ChangeType(bucket, key string, fileType int) (err error) {
ctx := context.TODO()
reqHost, reqErr := m.rsHost(bucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, URIChangeType(bucket, key, fileType))
err = m.client.Call(ctx, nil, "POST", reqURL)
return
}
// DeleteAfterDays 用来更新文件生命周期,如果 days 设置为0则表示取消文件的定期删除功能永久存储
func (m *BucketManager) DeleteAfterDays(bucket, key string, days int) (err error) {
ctx := context.TODO()
reqHost, reqErr := m.rsHost(bucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, URIDeleteAfterDays(bucket, key, days))
err = m.client.Call(ctx, nil, "POST", reqURL)
return
}
// Batch 接口提供了资源管理的批量操作,支持 statcopymovedeletechgmchtypedeleteAfterDays几个接口
func (m *BucketManager) Batch(operations []string) (batchOpRet []BatchOpRet, err error) {
if len(operations) > 1000 {
err = errors.New("batch operation count exceeds the limit of 1000")
return
}
ctx := context.TODO()
scheme := "http://"
if m.cfg.UseHTTPS {
scheme = "https://"
}
reqURL := fmt.Sprintf("%s%s/batch", scheme, DefaultRsHost)
params := map[string][]string{
"op": operations,
}
err = m.client.CallWithForm(ctx, &batchOpRet, "POST", reqURL, params)
return
}
// Fetch 根据提供的远程资源链接来抓取一个文件到空间并已指定文件名保存
func (m *BucketManager) Fetch(resURL, bucket, key string) (fetchRet FetchRet, err error) {
ctx := context.TODO()
reqHost, reqErr := m.iovipHost(bucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, uriFetch(resURL, bucket, key))
err = m.client.Call(ctx, &fetchRet, "POST", reqURL)
return
}
// FetchWithoutKey 根据提供的远程资源链接来抓取一个文件到空间并以文件的内容hash作为文件名
func (m *BucketManager) FetchWithoutKey(resURL, bucket string) (fetchRet FetchRet, err error) {
ctx := context.TODO()
reqHost, reqErr := m.iovipHost(bucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, uriFetchWithoutKey(resURL, bucket))
err = m.client.Call(ctx, &fetchRet, "POST", reqURL)
return
}
// Prefetch 用来同步镜像空间的资源和镜像源资源内容
func (m *BucketManager) Prefetch(bucket, key string) (err error) {
ctx := context.TODO()
reqHost, reqErr := m.iovipHost(bucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s%s", reqHost, uriPrefetch(bucket, key))
err = m.client.Call(ctx, nil, "POST", reqURL)
return
}
// SetImage 用来设置空间镜像源
func (m *BucketManager) SetImage(siteURL, bucket string) (err error) {
ctx := context.TODO()
reqURL := fmt.Sprintf("http://%s%s", DefaultPubHost, uriSetImage(siteURL, bucket))
err = m.client.Call(ctx, nil, "POST", reqURL)
return
}
// SetImageWithHost 用来设置空间镜像源额外添加回源Host头部
func (m *BucketManager) SetImageWithHost(siteURL, bucket, host string) (err error) {
ctx := context.TODO()
reqURL := fmt.Sprintf("http://%s%s", DefaultPubHost,
uriSetImageWithHost(siteURL, bucket, host))
err = m.client.Call(ctx, nil, "POST", reqURL)
return
}
// UnsetImage 用来取消空间镜像源设置
func (m *BucketManager) UnsetImage(bucket string) (err error) {
ctx := context.TODO()
reqURL := fmt.Sprintf("http://%s%s", DefaultPubHost, uriUnsetImage(bucket))
err = m.client.Call(ctx, nil, "POST", reqURL)
return err
}
type listFilesRet struct {
Marker string `json:"marker"`
Items []ListItem `json:"items"`
CommonPrefixes []string `json:"commonPrefixes"`
}
// ListFiles 用来获取空间文件列表,可以根据需要指定文件的前缀 prefix文件的目录 delimiter循环列举的时候下次
// 列举的位置 marker以及每次返回的文件的最大数量limit其中limit最大为1000。
func (m *BucketManager) ListFiles(bucket, prefix, delimiter, marker string,
limit int) (entries []ListItem, commonPrefixes []string, nextMarker string, hasNext bool, err error) {
if limit <= 0 || limit > 1000 {
err = errors.New("invalid list limit, only allow [1, 1000]")
return
}
ctx := context.TODO()
reqHost, reqErr := m.rsfHost(bucket)
if reqErr != nil {
err = reqErr
return
}
ret := listFilesRet{}
reqURL := fmt.Sprintf("%s%s", reqHost, uriListFiles(bucket, prefix, delimiter, marker, limit))
err = m.client.Call(ctx, &ret, "POST", reqURL)
if err != nil {
return
}
commonPrefixes = ret.CommonPrefixes
nextMarker = ret.Marker
entries = ret.Items
if ret.Marker != "" {
hasNext = true
}
return
}
func (m *BucketManager) rsHost(bucket string) (rsHost string, err error) {
var zone *Zone
if m.cfg.Zone != nil {
zone = m.cfg.Zone
} else {
if v, zoneErr := GetZone(m.mac.AccessKey, bucket); zoneErr != nil {
err = zoneErr
return
} else {
zone = v
}
}
scheme := "http://"
if m.cfg.UseHTTPS {
scheme = "https://"
}
rsHost = fmt.Sprintf("%s%s", scheme, zone.RsHost)
return
}
func (m *BucketManager) rsfHost(bucket string) (rsfHost string, err error) {
var zone *Zone
if m.cfg.Zone != nil {
zone = m.cfg.Zone
} else {
if v, zoneErr := GetZone(m.mac.AccessKey, bucket); zoneErr != nil {
err = zoneErr
return
} else {
zone = v
}
}
scheme := "http://"
if m.cfg.UseHTTPS {
scheme = "https://"
}
rsfHost = fmt.Sprintf("%s%s", scheme, zone.RsfHost)
return
}
func (m *BucketManager) iovipHost(bucket string) (iovipHost string, err error) {
var zone *Zone
if m.cfg.Zone != nil {
zone = m.cfg.Zone
} else {
if v, zoneErr := GetZone(m.mac.AccessKey, bucket); zoneErr != nil {
err = zoneErr
return
} else {
zone = v
}
}
scheme := "http://"
if m.cfg.UseHTTPS {
scheme = "https://"
}
iovipHost = fmt.Sprintf("%s%s", scheme, zone.IovipHost)
return
}
// 构建op的方法导出的方法支持在Batch操作中使用
// URIStat 构建 stat 接口的请求命令
func URIStat(bucket, key string) string {
return fmt.Sprintf("/stat/%s", EncodedEntry(bucket, key))
}
// URIDelete 构建 delete 接口的请求命令
func URIDelete(bucket, key string) string {
return fmt.Sprintf("/delete/%s", EncodedEntry(bucket, key))
}
// URICopy 构建 copy 接口的请求命令
func URICopy(srcBucket, srcKey, destBucket, destKey string, force bool) string {
return fmt.Sprintf("/copy/%s/%s/force/%v", EncodedEntry(srcBucket, srcKey),
EncodedEntry(destBucket, destKey), force)
}
// URIMove 构建 move 接口的请求命令
func URIMove(srcBucket, srcKey, destBucket, destKey string, force bool) string {
return fmt.Sprintf("/move/%s/%s/force/%v", EncodedEntry(srcBucket, srcKey),
EncodedEntry(destBucket, destKey), force)
}
// URIDeleteAfterDays 构建 deleteAfterDays 接口的请求命令
func URIDeleteAfterDays(bucket, key string, days int) string {
return fmt.Sprintf("/deleteAfterDays/%s/%d", EncodedEntry(bucket, key), days)
}
// URIChangeMime 构建 chgm 接口的请求命令
func URIChangeMime(bucket, key, newMime string) string {
return fmt.Sprintf("/chgm/%s/mime/%s", EncodedEntry(bucket, key),
base64.URLEncoding.EncodeToString([]byte(newMime)))
}
// URIChangeType 构建 chtype 接口的请求命令
func URIChangeType(bucket, key string, fileType int) string {
return fmt.Sprintf("/chtype/%s/type/%d", EncodedEntry(bucket, key), fileType)
}
// 构建op的方法非导出的方法无法用在Batch操作中
func uriFetch(resURL, bucket, key string) string {
return fmt.Sprintf("/fetch/%s/to/%s",
base64.URLEncoding.EncodeToString([]byte(resURL)), EncodedEntry(bucket, key))
}
func uriFetchWithoutKey(resURL, bucket string) string {
return fmt.Sprintf("/fetch/%s/to/%s",
base64.URLEncoding.EncodeToString([]byte(resURL)), EncodedEntryWithoutKey(bucket))
}
func uriPrefetch(bucket, key string) string {
return fmt.Sprintf("/prefetch/%s", EncodedEntry(bucket, key))
}
func uriSetImage(siteURL, bucket string) string {
return fmt.Sprintf("/image/%s/from/%s", bucket,
base64.URLEncoding.EncodeToString([]byte(siteURL)))
}
func uriSetImageWithHost(siteURL, bucket, host string) string {
return fmt.Sprintf("/image/%s/from/%s/host/%s", bucket,
base64.URLEncoding.EncodeToString([]byte(siteURL)),
base64.URLEncoding.EncodeToString([]byte(host)))
}
func uriUnsetImage(bucket string) string {
return fmt.Sprintf("/unimage/%s", bucket)
}
func uriListFiles(bucket, prefix, delimiter, marker string, limit int) string {
query := make(url.Values)
query.Add("bucket", bucket)
if prefix != "" {
query.Add("prefix", prefix)
}
if delimiter != "" {
query.Add("delimiter", delimiter)
}
if marker != "" {
query.Add("marker", marker)
}
if limit > 0 {
query.Add("limit", strconv.FormatInt(int64(limit), 10))
}
return fmt.Sprintf("/list?%s", query.Encode())
}
// EncodedEntry 生成URL Safe Base64编码的 Entry
func EncodedEntry(bucket, key string) string {
entry := fmt.Sprintf("%s:%s", bucket, key)
return base64.URLEncoding.EncodeToString([]byte(entry))
}
// EncodedEntryWithoutKey 生成 key 为null的情况下 URL Safe Base64编码的Entry
func EncodedEntryWithoutKey(bucket string) string {
return base64.URLEncoding.EncodeToString([]byte(bucket))
}
// MakePublicURL 用来生成公开空间资源下载链接
func MakePublicURL(domain, key string) (finalUrl string) {
srcUrl := fmt.Sprintf("%s/%s", domain, key)
srcUri, _ := url.Parse(srcUrl)
finalUrl = srcUri.String()
return
}
// MakePrivateURL 用来生成私有空间资源下载链接
func MakePrivateURL(mac *qbox.Mac, domain, key string, deadline int64) (privateURL string) {
publicURL := MakePublicURL(domain, key)
urlToSign := publicURL
if strings.Contains(publicURL, "?") {
urlToSign = fmt.Sprintf("%s&e=%d", urlToSign, deadline)
} else {
urlToSign = fmt.Sprintf("%s?e=%d", urlToSign, deadline)
}
token := mac.Sign([]byte(urlToSign))
privateURL = fmt.Sprintf("%s&token=%s", urlToSign, token)
return
}

264
vendor/github.com/qiniu/api.v7/storage/bucket_test.go generated vendored Normal file
View File

@@ -0,0 +1,264 @@
package storage
import (
"fmt"
"math/rand"
"net/http"
"os"
"testing"
"time"
"github.com/qiniu/api.v7/auth/qbox"
)
var (
testAK = os.Getenv("QINIU_ACCESS_KEY")
testSK = os.Getenv("QINIU_SECRET_KEY")
testBucket = os.Getenv("QINIU_TEST_BUCKET")
testBucketPrivate = os.Getenv("QINIU_TEST_BUCKET_PRIVATE")
testBucketPrivateDomain = os.Getenv("QINIU_TEST_DOMAIN_PRIVATE")
testPipeline = os.Getenv("QINIU_TEST_PIPELINE")
testKey = "qiniu.png"
testFetchUrl = "http://devtools.qiniu.com/qiniu.png"
testSiteUrl = "http://devtools.qiniu.com"
)
var mac *qbox.Mac
var bucketManager *BucketManager
var operationManager *OperationManager
var formUploader *FormUploader
var resumeUploader *ResumeUploader
var base64Uploader *Base64Uploader
func init() {
if testAK == "" || testSK == "" {
panic("please run ./test-env.sh first")
}
mac = qbox.NewMac(testAK, testSK)
cfg := Config{}
cfg.Zone = &Zone_z0
cfg.UseCdnDomains = true
bucketManager = NewBucketManager(mac, &cfg)
operationManager = NewOperationManager(mac, &cfg)
formUploader = NewFormUploader(&cfg)
resumeUploader = NewResumeUploader(&cfg)
base64Uploader = NewBase64Uploader(&cfg)
rand.Seed(time.Now().Unix())
}
//Test get zone
func TestGetZone(t *testing.T) {
zone, err := GetZone(testAK, testBucket)
if err != nil {
t.Fatalf("GetZone() error, %s", err)
}
t.Log(zone.String())
}
//Test get bucket list
func TestBuckets(t *testing.T) {
shared := true
buckets, err := bucketManager.Buckets(shared)
if err != nil {
t.Fatalf("Buckets() error, %s", err)
}
for _, bucket := range buckets {
t.Log(bucket)
}
}
//Test get file info
func TestStat(t *testing.T) {
keysToStat := []string{"qiniu.png"}
for _, eachKey := range keysToStat {
info, err := bucketManager.Stat(testBucket, eachKey)
if err != nil {
t.Logf("Stat() error, %s", err)
t.Fail()
} else {
t.Logf("FileInfo:\n %s", info.String())
}
}
}
func TestCopyMoveDelete(t *testing.T) {
keysCopyTarget := []string{"qiniu_1.png", "qiniu_2.png", "qiniu_3.png"}
keysToDelete := make([]string, 0, len(keysCopyTarget))
for _, eachKey := range keysCopyTarget {
err := bucketManager.Copy(testBucket, testKey, testBucket, eachKey, true)
if err != nil {
t.Logf("Copy() error, %s", err)
t.Fail()
}
}
for _, eachKey := range keysCopyTarget {
keyToDelete := eachKey + "_move"
err := bucketManager.Move(testBucket, eachKey, testBucket, keyToDelete, true)
if err != nil {
t.Logf("Move() error, %s", err)
t.Fail()
} else {
keysToDelete = append(keysToDelete, keyToDelete)
}
}
for _, eachKey := range keysToDelete {
err := bucketManager.Delete(testBucket, eachKey)
if err != nil {
t.Logf("Delete() error, %s", err)
t.Fail()
}
}
}
func TestFetch(t *testing.T) {
ret, err := bucketManager.Fetch(testFetchUrl, testBucket, "qiniu-fetch.png")
if err != nil {
t.Logf("Fetch() error, %s", err)
t.Fail()
} else {
t.Logf("FetchRet:\n %s", ret.String())
}
}
func TestFetchWithoutKey(t *testing.T) {
ret, err := bucketManager.FetchWithoutKey(testFetchUrl, testBucket)
if err != nil {
t.Logf("FetchWithoutKey() error, %s", err)
t.Fail()
} else {
t.Logf("FetchRet:\n %s", ret.String())
}
}
func TestDeleteAfterDays(t *testing.T) {
deleteKey := testKey + "_deleteAfterDays"
days := 7
bucketManager.Copy(testBucket, testKey, testBucket, deleteKey, true)
err := bucketManager.DeleteAfterDays(testBucket, deleteKey, days)
if err != nil {
t.Logf("DeleteAfterDays() error, %s", err)
t.Fail()
}
}
func TestChangeMime(t *testing.T) {
toChangeKey := testKey + "_changeMime"
bucketManager.Copy(testBucket, testKey, testBucket, toChangeKey, true)
newMime := "text/plain"
err := bucketManager.ChangeMime(testBucket, toChangeKey, newMime)
if err != nil {
t.Fatalf("ChangeMime() error, %s", err)
}
info, err := bucketManager.Stat(testBucket, toChangeKey)
if err != nil || info.MimeType != newMime {
t.Fatalf("ChangeMime() failed, %s", err)
}
bucketManager.Delete(testBucket, toChangeKey)
}
func TestChangeType(t *testing.T) {
toChangeKey := fmt.Sprintf("%s_changeType_%d", testKey, rand.Int())
bucketManager.Copy(testBucket, testKey, testBucket, toChangeKey, true)
fileType := 1
err := bucketManager.ChangeType(testBucket, toChangeKey, fileType)
if err != nil {
t.Fatalf("ChangeType() error, %s", err)
}
info, err := bucketManager.Stat(testBucket, toChangeKey)
if err != nil || info.Type != fileType {
t.Fatalf("ChangeMime() failed, %s", err)
}
bucketManager.Delete(testBucket, toChangeKey)
}
func TestPrefetchAndImage(t *testing.T) {
err := bucketManager.SetImage(testSiteUrl, testBucket)
if err != nil {
t.Fatalf("SetImage() error, %s", err)
}
err = bucketManager.Prefetch(testBucket, testKey)
if err != nil {
t.Fatalf("Prefetch() error, %s", err)
}
err = bucketManager.UnsetImage(testBucket)
if err != nil {
t.Fatalf("UnsetImage() error, %s", err)
}
}
func TestListFiles(t *testing.T) {
limit := 100
prefix := "listfiles/"
for i := 0; i < limit; i++ {
newKey := fmt.Sprintf("%s%s/%d", prefix, testKey, i)
bucketManager.Copy(testBucket, testKey, testBucket, newKey, true)
}
entries, _, _, hasNext, err := bucketManager.ListFiles(testBucket, prefix, "", "", limit)
if err != nil {
t.Fatalf("ListFiles() error, %s", err)
}
if hasNext {
t.Fatalf("ListFiles() failed, unexpected hasNext")
}
if len(entries) != limit {
t.Fatalf("ListFiles() failed, unexpected items count, expected: %d, actual: %d", limit, len(entries))
}
for _, entry := range entries {
t.Logf("ListItem:\n%s", entry.String())
}
}
func TestMakePrivateUrl(t *testing.T) {
deadline := time.Now().Add(time.Second * 3600).Unix()
privateURL := MakePrivateURL(mac, "http://"+testBucketPrivateDomain, testKey, deadline)
t.Logf("PrivateUrl: %s", privateURL)
resp, respErr := http.Get(privateURL)
if respErr != nil {
t.Fatalf("MakePrivateUrl() error, %s", respErr)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("MakePrivateUrl() error, %s", resp.Status)
}
}
func TestBatch(t *testing.T) {
copyCnt := 100
copyOps := make([]string, 0, copyCnt)
testKeys := make([]string, 0, copyCnt)
for i := 0; i < copyCnt; i++ {
cpKey := fmt.Sprintf("%s_batchcopy_%d", testKey, i)
testKeys = append(testKeys, cpKey)
copyOps = append(copyOps, URICopy(testBucket, testKey, testBucket, cpKey, true))
}
_, bErr := bucketManager.Batch(copyOps)
if bErr != nil {
t.Fatalf("BatchCopy error, %s", bErr)
}
statOps := make([]string, 0, copyCnt)
for _, k := range testKeys {
statOps = append(statOps, URIStat(testBucket, k))
}
batchOpRets, bErr := bucketManager.Batch(statOps)
_, bErr = bucketManager.Batch(copyOps)
if bErr != nil {
t.Fatalf("BatchStat error, %s", bErr)
}
t.Logf("BatchStat: %v", batchOpRets)
}

38
vendor/github.com/qiniu/api.v7/storage/client.go generated vendored Normal file
View File

@@ -0,0 +1,38 @@
package storage
import (
"github.com/qiniu/api.v7/auth/qbox"
"github.com/qiniu/x/rpc.v7"
"net/http"
)
type Transport struct {
mac qbox.Mac
Transport http.RoundTripper
}
func (t *Transport) NestedObject() interface{} {
return t.Transport
}
func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
token, err := t.mac.SignRequest(req)
if err != nil {
return
}
req.Header.Set("Authorization", "QBox "+token)
return t.Transport.RoundTrip(req)
}
func NewTransport(mac *qbox.Mac, transport http.RoundTripper) *Transport {
if transport == nil {
transport = http.DefaultTransport
}
t := &Transport{mac: *mac, Transport: transport}
return t
}
func NewClient(mac *qbox.Mac, transport http.RoundTripper) *rpc.Client {
t := NewTransport(mac, transport)
return &rpc.Client{&http.Client{Transport: t}}
}

8
vendor/github.com/qiniu/api.v7/storage/config.go generated vendored Normal file
View File

@@ -0,0 +1,8 @@
package storage
// Config 为文件上传,资源管理等配置
type Config struct {
Zone *Zone //空间所在的机房
UseHTTPS bool //是否使用https域名
UseCdnDomains bool //是否使用cdn加速域名
}

10
vendor/github.com/qiniu/api.v7/storage/doc.go generated vendored Normal file
View File

@@ -0,0 +1,10 @@
// storage 包提供了资源的上传,管理,数据处理等功能。其中资源的上传又提供了表单上传的方式以及分片上传的方式,其中分片上传的方式还支持断点续传。
//
// 该包中提供了 BucketManager 用来进行资源管理,比如获取文件信息,文件复制,删除,重命名等,以及很多高级功能如修改文件类型,
// 修改文件的生命周期,修改文件的存储类型等。
//
// 该包中还提供了 FormUploader 和 ResumeUploader 来分别支持表单上传和分片上传断点续传等功能对于较大的文件比如100MB以上的文件一般
// 建议采用分片上传的方式来保证上传的效率和可靠性。
//
// 对于数据处理,则提供了 OperationManager可以使用它来发送持久化数据处理请求及查询数据处理的状态。
package storage

345
vendor/github.com/qiniu/api.v7/storage/form_upload.go generated vendored Normal file
View File

@@ -0,0 +1,345 @@
package storage
import (
"bytes"
"context"
"fmt"
"hash"
"hash/crc32"
"io"
"mime/multipart"
"net/textproto"
"os"
"path"
"path/filepath"
"strings"
"github.com/qiniu/x/rpc.v7"
)
// PutExtra 为表单上传的额外可选项
type PutExtra struct {
// 可选,用户自定义参数,必须以 "x:" 开头。若不以x:开头,则忽略。
Params map[string]string
// 可选,当为 "" 时候,服务端自动判断。
MimeType string
// 上传事件:进度通知。这个事件的回调函数应该尽可能快地结束。
OnProgress func(fsize, uploaded int64)
}
// PutRet 为七牛标准的上传回复内容。
// 如果使用了上传回调或者自定义了returnBody那么需要根据实际情况自己自定义一个返回值结构体
type PutRet struct {
Hash string `json:"hash"`
PersistentID string `json:"persistentId"`
Key string `json:"key"`
}
// FormUploader 表示一个表单上传的对象
type FormUploader struct {
client *rpc.Client
cfg *Config
}
// NewFormUploader 用来构建一个表单上传的对象
func NewFormUploader(cfg *Config) *FormUploader {
if cfg == nil {
cfg = &Config{}
}
return &FormUploader{
client: &rpc.DefaultClient,
cfg: cfg,
}
}
// NewFormUploaderEx 用来构建一个表单上传的对象
func NewFormUploaderEx(cfg *Config, client *rpc.Client) *FormUploader {
if cfg == nil {
cfg = &Config{}
}
if client == nil {
client = &rpc.DefaultClient
}
return &FormUploader{
client: client,
cfg: cfg,
}
}
// PutFile 用来以表单方式上传一个文件,和 Put 不同的只是一个通过提供文件路径来访问文件内容,一个通过 io.Reader 来访问。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 callbackUrl 或 returnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// key 是要上传的文件访问路径。比如:"foo/bar.jpg"。注意我们建议 key 不要以 '/' 开头。另外key 为空字符串是合法的。
// localFile 是要上传的文件的本地路径。
// extra 是上传的一些可选项可以指定为nil。详细见 PutExtra 结构的描述。
//
func (p *FormUploader) PutFile(
ctx context.Context, ret interface{}, uptoken, key, localFile string, extra *PutExtra) (err error) {
return p.putFile(ctx, ret, uptoken, key, true, localFile, extra)
}
// PutFileWithoutKey 用来以表单方式上传一个文件。不指定文件上传后保存的key的情况下文件命名方式首先看看
// uptoken 中是否设置了 saveKey如果设置了 saveKey那么按 saveKey 要求的规则生成 key否则自动以文件的 hash 做 key。
// 和 Put 不同的只是一个通过提供文件路径来访问文件内容,一个通过 io.Reader 来访问。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// localFile 是要上传的文件的本地路径。
// extra 是上传的一些可选项。可以指定为nil。详细见 PutExtra 结构的描述。
//
func (p *FormUploader) PutFileWithoutKey(
ctx context.Context, ret interface{}, uptoken, localFile string, extra *PutExtra) (err error) {
return p.putFile(ctx, ret, uptoken, "", false, localFile, extra)
}
func (p *FormUploader) putFile(
ctx context.Context, ret interface{}, uptoken string,
key string, hasKey bool, localFile string, extra *PutExtra) (err error) {
f, err := os.Open(localFile)
if err != nil {
return
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return
}
fsize := fi.Size()
if extra == nil {
extra = &PutExtra{}
}
return p.put(ctx, ret, uptoken, key, hasKey, f, fsize, extra, filepath.Base(localFile))
}
// Put 用来以表单方式上传一个文件。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 callbackUrl 或 returnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// key 是要上传的文件访问路径。比如:"foo/bar.jpg"。注意我们建议 key 不要以 '/' 开头。另外key 为空字符串是合法的。
// data 是文件内容的访问接口io.Reader
// fsize 是要上传的文件大小。
// extra 是上传的一些可选项。可以指定为nil。详细见 PutExtra 结构的描述。
//
func (p *FormUploader) Put(
ctx context.Context, ret interface{}, uptoken, key string, data io.Reader, size int64, extra *PutExtra) (err error) {
err = p.put(ctx, ret, uptoken, key, true, data, size, extra, path.Base(key))
return
}
// PutWithoutKey 用来以表单方式上传一个文件。不指定文件上传后保存的key的情况下文件命名方式首先看看 uptoken 中是否设置了 saveKey
// 如果设置了 saveKey那么按 saveKey 要求的规则生成 key否则自动以文件的 hash 做 key。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// data 是文件内容的访问接口io.Reader
// fsize 是要上传的文件大小。
// extra 是上传的一些可选项。详细见 PutExtra 结构的描述。
//
func (p *FormUploader) PutWithoutKey(
ctx context.Context, ret interface{}, uptoken string, data io.Reader, size int64, extra *PutExtra) (err error) {
err = p.put(ctx, ret, uptoken, "", false, data, size, extra, "filename")
return err
}
func (p *FormUploader) put(
ctx context.Context, ret interface{}, uptoken string,
key string, hasKey bool, data io.Reader, size int64, extra *PutExtra, fileName string) (err error) {
ak, bucket, gErr := getAkBucketFromUploadToken(uptoken)
if gErr != nil {
err = gErr
return
}
var upHost string
upHost, err = p.upHost(ak, bucket)
if err != nil {
return
}
var b bytes.Buffer
writer := multipart.NewWriter(&b)
if extra == nil {
extra = &PutExtra{}
}
if extra.OnProgress != nil {
data = &readerWithProgress{reader: data, fsize: size, onProgress: extra.OnProgress}
}
err = writeMultipart(writer, uptoken, key, hasKey, extra, fileName)
if err != nil {
return
}
var dataReader io.Reader
h := crc32.NewIEEE()
dataReader = io.TeeReader(data, h)
crcReader := newCrc32Reader(writer.Boundary(), h)
//write file
head := make(textproto.MIMEHeader)
head.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`,
escapeQuotes(fileName)))
if extra.MimeType != "" {
head.Set("Content-Type", extra.MimeType)
}
_, err = writer.CreatePart(head)
if err != nil {
return
}
lastLine := fmt.Sprintf("\r\n--%s--\r\n", writer.Boundary())
r := strings.NewReader(lastLine)
bodyLen := int64(-1)
if size >= 0 {
bodyLen = int64(b.Len()) + size + int64(len(lastLine))
bodyLen += crcReader.length()
}
mr := io.MultiReader(&b, dataReader, crcReader, r)
contentType := writer.FormDataContentType()
err = p.client.CallWith64(ctx, ret, "POST", upHost, contentType, mr, bodyLen)
if err != nil {
return
}
if extra.OnProgress != nil {
extra.OnProgress(size, size)
}
return
}
type crc32Reader struct {
h hash.Hash32
boundary string
r io.Reader
flag bool
nlDashBoundaryNl string
header string
crc32PadLen int64
}
func newCrc32Reader(boundary string, h hash.Hash32) *crc32Reader {
nlDashBoundaryNl := fmt.Sprintf("\r\n--%s\r\n", boundary)
header := `Content-Disposition: form-data; name="crc32"` + "\r\n\r\n"
return &crc32Reader{
h: h,
boundary: boundary,
nlDashBoundaryNl: nlDashBoundaryNl,
header: header,
crc32PadLen: 10,
}
}
func (r *crc32Reader) Read(p []byte) (int, error) {
if r.flag == false {
crc32 := r.h.Sum32()
crc32Line := r.nlDashBoundaryNl + r.header + fmt.Sprintf("%010d", crc32) //padding crc32 results to 10 digits
r.r = strings.NewReader(crc32Line)
r.flag = true
}
return r.r.Read(p)
}
func (r crc32Reader) length() (length int64) {
return int64(len(r.nlDashBoundaryNl+r.header)) + r.crc32PadLen
}
func (p *FormUploader) upHost(ak, bucket string) (upHost string, err error) {
var zone *Zone
if p.cfg.Zone != nil {
zone = p.cfg.Zone
} else {
if v, zoneErr := GetZone(ak, bucket); zoneErr != nil {
err = zoneErr
return
} else {
zone = v
}
}
scheme := "http://"
if p.cfg.UseHTTPS {
scheme = "https://"
}
host := zone.SrcUpHosts[0]
if p.cfg.UseCdnDomains {
host = zone.CdnUpHosts[0]
}
upHost = fmt.Sprintf("%s%s", scheme, host)
return
}
type readerWithProgress struct {
reader io.Reader
uploaded int64
fsize int64
onProgress func(fsize, uploaded int64)
}
func (p *readerWithProgress) Read(b []byte) (n int, err error) {
if p.uploaded > 0 {
p.onProgress(p.fsize, p.uploaded)
}
n, err = p.reader.Read(b)
p.uploaded += int64(n)
return
}
func writeMultipart(writer *multipart.Writer, uptoken, key string, hasKey bool,
extra *PutExtra, fileName string) (err error) {
//token
if err = writer.WriteField("token", uptoken); err != nil {
return
}
//key
if hasKey {
if err = writer.WriteField("key", key); err != nil {
return
}
}
//extra.Params
if extra.Params != nil {
for k, v := range extra.Params {
if strings.HasPrefix(k, "x:") && v != "" {
err = writer.WriteField(k, v)
if err != nil {
return
}
}
}
}
return err
}
var quoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
func escapeQuotes(s string) string {
return quoteEscaper.Replace(s)
}

View File

@@ -0,0 +1,31 @@
package storage
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"testing"
)
var (
testLocalFile = filepath.Join(os.Getenv("TRAVIS_BUILD_DIR"), "Makefile")
)
func TestFormUploadPutFile(t *testing.T) {
var putRet PutRet
ctx := context.TODO()
putPolicy := PutPolicy{
Scope: testBucket,
DeleteAfterDays: 7,
}
upToken := putPolicy.UploadToken(mac)
testKey := fmt.Sprintf("testPutFileKey_%d", rand.Int())
err := formUploader.PutFile(ctx, &putRet, upToken, testKey, testLocalFile, nil)
if err != nil {
t.Fatalf("FormUploader#PutFile() error, %s", err)
}
t.Logf("Key: %s, Hash:%s", putRet.Key, putRet.Hash)
}

5
vendor/github.com/qiniu/api.v7/storage/init.go generated vendored Normal file
View File

@@ -0,0 +1,5 @@
package storage
import (
_ "github.com/qiniu/api.v7/conf"
)

206
vendor/github.com/qiniu/api.v7/storage/pfop.go generated vendored Normal file
View File

@@ -0,0 +1,206 @@
package storage
import (
"context"
"fmt"
"github.com/qiniu/api.v7/auth/qbox"
"github.com/qiniu/x/rpc.v7"
)
// OperationManager 提供了数据处理相关的方法
type OperationManager struct {
client *rpc.Client
mac *qbox.Mac
cfg *Config
}
// NewOperationManager 用来构建一个新的数据处理对象
func NewOperationManager(mac *qbox.Mac, cfg *Config) *OperationManager {
if cfg == nil {
cfg = &Config{}
}
return &OperationManager{
client: NewClient(mac, nil),
mac: mac,
cfg: cfg,
}
}
// NewOperationManager 用来构建一个新的数据处理对象
func NewOperationManagerEx(mac *qbox.Mac, cfg *Config, client *rpc.Client) *OperationManager {
if cfg == nil {
cfg = &Config{}
}
if client == nil {
client = NewClient(mac, nil)
}
return &OperationManager{
client: client,
mac: mac,
cfg: cfg,
}
}
// PfopRet 为数据处理请求的回复内容
type PfopRet struct {
PersistentID string `json:"persistentId,omitempty"`
}
// PrefopRet 为数据处理请求的状态查询回复内容
type PrefopRet struct {
ID string `json:"id"`
Code int `json:"code"`
Desc string `json:"desc"`
InputBucket string `json:"inputBucket,omitempty"`
InputKey string `json:"inputKey,omitempty"`
Pipeline string `json:"pipeline,omitempty"`
Reqid string `json:"reqid,omitempty"`
Items []FopResult
}
func (r *PrefopRet) String() string {
strData := fmt.Sprintf("Id: %s\r\nCode: %d\r\nDesc: %s\r\n", r.ID, r.Code, r.Desc)
if r.InputBucket != "" {
strData += fmt.Sprintln(fmt.Sprintf("InputBucket: %s", r.InputBucket))
}
if r.InputKey != "" {
strData += fmt.Sprintln(fmt.Sprintf("InputKey: %s", r.InputKey))
}
if r.Pipeline != "" {
strData += fmt.Sprintln(fmt.Sprintf("Pipeline: %s", r.Pipeline))
}
if r.Reqid != "" {
strData += fmt.Sprintln(fmt.Sprintf("Reqid: %s", r.Reqid))
}
strData = fmt.Sprintln(strData)
for _, item := range r.Items {
strData += fmt.Sprintf("\tCmd:\t%s\r\n\tCode:\t%d\r\n\tDesc:\t%s\r\n", item.Cmd, item.Code, item.Desc)
if item.Error != "" {
strData += fmt.Sprintf("\tError:\t%s\r\n", item.Error)
} else {
if item.Hash != "" {
strData += fmt.Sprintf("\tHash:\t%s\r\n", item.Hash)
}
if item.Key != "" {
strData += fmt.Sprintf("\tKey:\t%s\r\n", item.Key)
}
if item.Keys != nil {
if len(item.Keys) > 0 {
strData += "\tKeys: {\r\n"
for _, key := range item.Keys {
strData += fmt.Sprintf("\t\t%s\r\n", key)
}
strData += "\t}\r\n"
}
}
}
strData += "\r\n"
}
return strData
}
// FopResult 云处理操作列表,包含每个云处理操作的状态信息
type FopResult struct {
Cmd string `json:"cmd"`
Code int `json:"code"`
Desc string `json:"desc"`
Error string `json:"error,omitempty"`
Hash string `json:"hash,omitempty"`
Key string `json:"key,omitempty"`
Keys []string `json:"keys,omitempty"`
}
// Pfop 持久化数据处理
//
// bucket 资源空间
// key 源资源名
// fops 云处理操作列表,
// notifyURL 处理结果通知接收URL
// pipeline 多媒体处理队列名称
// force 强制执行数据处理
//
func (m *OperationManager) Pfop(bucket, key, fops, pipeline, notifyURL string,
force bool) (persistentID string, err error) {
pfopParams := map[string][]string{
"bucket": []string{bucket},
"key": []string{key},
"fops": []string{fops},
}
if pipeline != "" {
pfopParams["pipeline"] = []string{pipeline}
}
if notifyURL != "" {
pfopParams["notifyURL"] = []string{notifyURL}
}
if force {
pfopParams["force"] = []string{"1"}
}
var ret PfopRet
ctx := context.TODO()
reqHost, reqErr := m.apiHost(bucket)
if reqErr != nil {
err = reqErr
return
}
reqURL := fmt.Sprintf("%s/pfop/", reqHost)
err = m.client.CallWithForm(ctx, &ret, "POST", reqURL, pfopParams)
if err != nil {
return
}
persistentID = ret.PersistentID
return
}
// Prefop 持久化处理状态查询
func (m *OperationManager) Prefop(persistentID string) (ret PrefopRet, err error) {
ctx := context.TODO()
reqHost := m.prefopApiHost(persistentID)
reqURL := fmt.Sprintf("%s/status/get/prefop?id=%s", reqHost, persistentID)
err = m.client.Call(ctx, &ret, "GET", reqURL)
return
}
func (m *OperationManager) apiHost(bucket string) (apiHost string, err error) {
var zone *Zone
if m.cfg.Zone != nil {
zone = m.cfg.Zone
} else {
if v, zoneErr := GetZone(m.mac.AccessKey, bucket); zoneErr != nil {
err = zoneErr
return
} else {
zone = v
}
}
scheme := "http://"
if m.cfg.UseHTTPS {
scheme = "https://"
}
apiHost = fmt.Sprintf("%s%s", scheme, zone.ApiHost)
return
}
func (m *OperationManager) prefopApiHost(persistentID string) (apiHost string) {
apiHost = "api.qiniu.com"
if m.cfg.Zone != nil {
apiHost = m.cfg.Zone.ApiHost
}
if m.cfg.UseHTTPS {
apiHost = fmt.Sprintf("https://%s", apiHost)
} else {
apiHost = fmt.Sprintf("http://%s", apiHost)
}
return
}

45
vendor/github.com/qiniu/api.v7/storage/pfop_test.go generated vendored Normal file
View File

@@ -0,0 +1,45 @@
package storage
import (
"encoding/base64"
"fmt"
"strings"
"testing"
)
var (
testVideoKey = "qiniu.mp4"
)
func TestPrefop(t *testing.T) {
pid := "na0.597802c092129336c20f3f91"
prefopRet, err := operationManager.Prefop(pid)
if err != nil {
t.Fatalf("Prefop() error, %s", err)
}
t.Logf("%s", prefopRet.String())
}
func TestPfop(t *testing.T) {
saveBucket := testBucket
fopAvthumb := fmt.Sprintf("avthumb/mp4/s/480x320/vb/500k|saveas/%s",
EncodedEntry(saveBucket, "pfop_test_qiniu.mp4"))
fopVframe := fmt.Sprintf("vframe/jpg/offset/10|saveas/%s",
EncodedEntry(saveBucket, "pfop_test_qiniu.jpg"))
fopVsample := fmt.Sprintf("vsample/jpg/interval/20/pattern/%s",
base64.URLEncoding.EncodeToString([]byte("pfop_test_$(count).jpg")))
fopBatch := []string{fopAvthumb, fopVframe, fopVsample}
fops := strings.Join(fopBatch, ";")
force := true
notifyURL := ""
pid, err := operationManager.Pfop(testBucket, testVideoKey, fops,
testPipeline, notifyURL, force)
if err != nil {
t.Fatalf("Pfop() error, %s", err)
}
t.Logf("persistentId: %s", pid)
}

200
vendor/github.com/qiniu/api.v7/storage/resume_base.go generated vendored Normal file
View File

@@ -0,0 +1,200 @@
package storage
import (
"context"
"encoding/base64"
"fmt"
"hash/crc32"
"io"
"net/http"
"strconv"
"github.com/qiniu/x/bytes.v7"
"github.com/qiniu/x/rpc.v7"
"github.com/qiniu/x/xlog.v7"
)
// ResumeUploader 表示一个分片上传的对象
type ResumeUploader struct {
client *rpc.Client
cfg *Config
}
// NewResumeUploader 表示构建一个新的分片上传的对象
func NewResumeUploader(cfg *Config) *ResumeUploader {
if cfg == nil {
cfg = &Config{}
}
return &ResumeUploader{
cfg: cfg,
client: &rpc.DefaultClient,
}
}
// NewResumeUploaderEx 表示构建一个新的分片上传的对象
func NewResumeUploaderEx(cfg *Config, client *rpc.Client) *ResumeUploader {
if cfg == nil {
cfg = &Config{}
}
if client == nil {
client = &rpc.DefaultClient
}
return &ResumeUploader{
client: client,
cfg: cfg,
}
}
// 分片上传请求
type uptokenTransport struct {
token string
Transport http.RoundTripper
}
func (t *uptokenTransport) NestedObject() interface{} {
return t.Transport
}
func (t *uptokenTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
req.Header.Set("Authorization", t.token)
return t.Transport.RoundTrip(req)
}
func newUptokenTransport(token string, transport http.RoundTripper) *uptokenTransport {
if transport == nil {
transport = http.DefaultTransport
}
return &uptokenTransport{"UpToken " + token, transport}
}
func newUptokenClient(client *rpc.Client, token string) *rpc.Client {
t := newUptokenTransport(token, client.Transport)
client.Transport = t
return client
}
// 创建块请求
func (p *ResumeUploader) Mkblk(
ctx context.Context, upHost string, ret *BlkputRet, blockSize int, body io.Reader, size int) error {
url := upHost + "/mkblk/" + strconv.Itoa(blockSize)
return p.client.CallWith(ctx, ret, "POST", url, "application/octet-stream", body, size)
}
// 发送bput请求
func (p *ResumeUploader) Bput(
ctx context.Context, ret *BlkputRet, body io.Reader, size int) error {
url := ret.Host + "/bput/" + ret.Ctx + "/" + strconv.FormatUint(uint64(ret.Offset), 10)
return p.client.CallWith(ctx, ret, "POST", url, "application/octet-stream", body, size)
}
// 分片上传请求
func (p *ResumeUploader) resumableBput(
ctx context.Context, upHost string, ret *BlkputRet, f io.ReaderAt, blkIdx, blkSize int, extra *RputExtra) (err error) {
log := xlog.NewWith(ctx)
h := crc32.NewIEEE()
offbase := int64(blkIdx) << blockBits
chunkSize := extra.ChunkSize
var bodyLength int
if ret.Ctx == "" {
if chunkSize < blkSize {
bodyLength = chunkSize
} else {
bodyLength = blkSize
}
body1 := io.NewSectionReader(f, offbase, int64(bodyLength))
body := io.TeeReader(body1, h)
err = p.Mkblk(ctx, upHost, ret, blkSize, body, bodyLength)
if err != nil {
return
}
if ret.Crc32 != h.Sum32() || int(ret.Offset) != bodyLength {
err = ErrUnmatchedChecksum
return
}
extra.Notify(blkIdx, blkSize, ret)
}
for int(ret.Offset) < blkSize {
if chunkSize < blkSize-int(ret.Offset) {
bodyLength = chunkSize
} else {
bodyLength = blkSize - int(ret.Offset)
}
tryTimes := extra.TryTimes
lzRetry:
h.Reset()
body1 := io.NewSectionReader(f, offbase+int64(ret.Offset), int64(bodyLength))
body := io.TeeReader(body1, h)
err = p.Bput(ctx, ret, body, bodyLength)
if err == nil {
if ret.Crc32 == h.Sum32() {
extra.Notify(blkIdx, blkSize, ret)
continue
}
log.Warn("ResumableBlockput: invalid checksum, retry")
err = ErrUnmatchedChecksum
} else {
if ei, ok := err.(*rpc.ErrorInfo); ok && ei.Code == InvalidCtx {
ret.Ctx = "" // reset
log.Warn("ResumableBlockput: invalid ctx, please retry")
return
}
log.Warn("ResumableBlockput: bput failed -", err)
}
if tryTimes > 1 {
tryTimes--
log.Info("ResumableBlockput retrying ...")
goto lzRetry
}
break
}
return
}
// 创建文件请求
func (p *ResumeUploader) Mkfile(
ctx context.Context, upHost string, ret interface{}, key string, hasKey bool, fsize int64, extra *RputExtra) (err error) {
url := upHost + "/mkfile/" + strconv.FormatInt(fsize, 10)
if extra.MimeType != "" {
url += "/mimeType/" + encode(extra.MimeType)
}
if hasKey {
url += "/key/" + encode(key)
}
for k, v := range extra.Params {
url += fmt.Sprintf("/%s/%s", k, encode(v))
}
buf := make([]byte, 0, 196*len(extra.Progresses))
for _, prog := range extra.Progresses {
buf = append(buf, prog.Ctx...)
buf = append(buf, ',')
}
if len(buf) > 0 {
buf = buf[:len(buf)-1]
}
return p.client.CallWith(
ctx, ret, "POST", url, "application/octet-stream", bytes.NewReader(buf), len(buf))
}
func encode(raw string) string {
return base64.URLEncoding.EncodeToString([]byte(raw))
}

311
vendor/github.com/qiniu/api.v7/storage/resume_upload.go generated vendored Normal file
View File

@@ -0,0 +1,311 @@
package storage
import (
"context"
"errors"
"fmt"
"io"
"os"
"sync"
"github.com/qiniu/x/xlog.v7"
)
// 分片上传过程中可能遇到的错误
var (
ErrInvalidPutProgress = errors.New("invalid put progress")
ErrPutFailed = errors.New("resumable put failed")
ErrUnmatchedChecksum = errors.New("unmatched checksum")
ErrBadToken = errors.New("invalid token")
)
// 上传进度过期错误
const (
InvalidCtx = 701 // UP: 无效的上下文(bput)可能情况Ctx非法或者已经被淘汰太久未使用
)
// 分片上传默认参数设置
const (
defaultWorkers = 4 // 默认的并发上传的块数量
defaultChunkSize = 4 * 1024 * 1024 // 默认的分片大小4MB
defaultTryTimes = 3 // bput 失败重试次数
)
// Settings 为分片上传设置
type Settings struct {
TaskQsize int // 可选。任务队列大小。为 0 表示取 Workers * 4。
Workers int // 并行 Goroutine 数目。
ChunkSize int // 默认的Chunk大小不设定则为4M
TryTimes int // 默认的尝试次数不设定则为3
}
// 分片上传的默认设置
var settings = Settings{
TaskQsize: defaultWorkers * 4,
Workers: defaultWorkers,
ChunkSize: defaultChunkSize,
TryTimes: defaultTryTimes,
}
// SetSettings 可以用来设置分片上传参数
func SetSettings(v *Settings) {
settings = *v
if settings.Workers == 0 {
settings.Workers = defaultWorkers
}
if settings.TaskQsize == 0 {
settings.TaskQsize = settings.Workers * 4
}
if settings.ChunkSize == 0 {
settings.ChunkSize = defaultChunkSize
}
if settings.TryTimes == 0 {
settings.TryTimes = defaultTryTimes
}
}
var tasks chan func()
func worker(tasks chan func()) {
for {
task := <-tasks
task()
}
}
func initWorkers() {
tasks = make(chan func(), settings.TaskQsize)
for i := 0; i < settings.Workers; i++ {
go worker(tasks)
}
}
// 上传完毕块之后的回调
func notifyNil(blkIdx int, blkSize int, ret *BlkputRet) {}
func notifyErrNil(blkIdx int, blkSize int, err error) {}
const (
blockBits = 22
blockMask = (1 << blockBits) - 1
)
// BlockCount 用来计算文件的分块数量
func BlockCount(fsize int64) int {
return int((fsize + blockMask) >> blockBits)
}
// BlkputRet 表示分片上传每个片上传完毕的返回值
type BlkputRet struct {
Ctx string `json:"ctx"`
Checksum string `json:"checksum"`
Crc32 uint32 `json:"crc32"`
Offset uint32 `json:"offset"`
Host string `json:"host"`
ExpiredAt int64 `json:"expired_at"`
}
// RputExtra 表示分片上传额外可以指定的参数
type RputExtra struct {
Params map[string]string // 可选。用户自定义参数,以"x:"开头,而且值不能为空,否则忽略
MimeType string // 可选。
ChunkSize int // 可选。每次上传的Chunk大小
TryTimes int // 可选。尝试次数
Progresses []BlkputRet // 可选。上传进度
Notify func(blkIdx int, blkSize int, ret *BlkputRet) // 可选。进度提示注意多个block是并行传输的
NotifyErr func(blkIdx int, blkSize int, err error)
}
var once sync.Once
// Put 方法用来上传一个文件,支持断点续传和分块上传。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// key 是要上传的文件访问路径。比如:"foo/bar.jpg"。注意我们建议 key 不要以 '/' 开头。另外key 为空字符串是合法的。
// f 是文件内容的访问接口。考虑到需要支持分块上传和断点续传,要的是 io.ReaderAt 接口,而不是 io.Reader。
// fsize 是要上传的文件大小。
// extra 是上传的一些可选项。详细见 RputExtra 结构的描述。
//
func (p *ResumeUploader) Put(ctx context.Context, ret interface{}, uptoken string, key string, f io.ReaderAt,
fsize int64, extra *RputExtra) (err error) {
err = p.rput(ctx, ret, uptoken, key, true, f, fsize, extra)
return
}
// PutWithoutKey 方法用来上传一个文件,支持断点续传和分块上传。文件命名方式首先看看
// uptoken 中是否设置了 saveKey如果设置了 saveKey那么按 saveKey 要求的规则生成 key否则自动以文件的 hash 做 key。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// f 是文件内容的访问接口。考虑到需要支持分块上传和断点续传,要的是 io.ReaderAt 接口,而不是 io.Reader。
// fsize 是要上传的文件大小。
// extra 是上传的一些可选项。详细见 RputExtra 结构的描述。
//
func (p *ResumeUploader) PutWithoutKey(
ctx context.Context, ret interface{}, uptoken string, f io.ReaderAt, fsize int64, extra *RputExtra) (err error) {
err = p.rput(ctx, ret, uptoken, "", false, f, fsize, extra)
return
}
// PutFile 用来上传一个文件,支持断点续传和分块上传。
// 和 Put 不同的只是一个通过提供文件路径来访问文件内容,一个通过 io.ReaderAt 来访问。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// key 是要上传的文件访问路径。比如:"foo/bar.jpg"。注意我们建议 key 不要以 '/' 开头。另外key 为空字符串是合法的。
// localFile 是要上传的文件的本地路径。
// extra 是上传的一些可选项。详细见 RputExtra 结构的描述。
//
func (p *ResumeUploader) PutFile(
ctx context.Context, ret interface{}, uptoken, key, localFile string, extra *RputExtra) (err error) {
err = p.rputFile(ctx, ret, uptoken, key, true, localFile, extra)
return
}
// PutFileWithoutKey 上传一个文件,支持断点续传和分块上传。文件命名方式首先看看
// uptoken 中是否设置了 saveKey如果设置了 saveKey那么按 saveKey 要求的规则生成 key否则自动以文件的 hash 做 key。
// 和 PutWithoutKey 不同的只是一个通过提供文件路径来访问文件内容,一个通过 io.ReaderAt 来访问。
//
// ctx 是请求的上下文。
// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 CallbackUrl 或 ReturnBody那么返回的数据结构是 PutRet 结构。
// uptoken 是由业务服务器颁发的上传凭证。
// localFile 是要上传的文件的本地路径。
// extra 是上传的一些可选项。详细见 RputExtra 结构的描述。
//
func (p *ResumeUploader) PutFileWithoutKey(
ctx context.Context, ret interface{}, uptoken, localFile string, extra *RputExtra) (err error) {
return p.rputFile(ctx, ret, uptoken, "", false, localFile, extra)
}
func (p *ResumeUploader) rput(
ctx context.Context, ret interface{}, uptoken string,
key string, hasKey bool, f io.ReaderAt, fsize int64, extra *RputExtra) (err error) {
once.Do(initWorkers)
log := xlog.NewWith(ctx)
blockCnt := BlockCount(fsize)
if extra == nil {
extra = new(RputExtra)
}
if extra.Progresses == nil {
extra.Progresses = make([]BlkputRet, blockCnt)
} else if len(extra.Progresses) != blockCnt {
return ErrInvalidPutProgress
}
if extra.ChunkSize == 0 {
extra.ChunkSize = settings.ChunkSize
}
if extra.TryTimes == 0 {
extra.TryTimes = settings.TryTimes
}
if extra.Notify == nil {
extra.Notify = notifyNil
}
if extra.NotifyErr == nil {
extra.NotifyErr = notifyErrNil
}
//get up host
ak, bucket, gErr := getAkBucketFromUploadToken(uptoken)
if gErr != nil {
err = gErr
return
}
upHost, gErr := p.upHost(ak, bucket)
if gErr != nil {
err = gErr
return
}
var wg sync.WaitGroup
wg.Add(blockCnt)
last := blockCnt - 1
blkSize := 1 << blockBits
nfails := 0
p.client = newUptokenClient(p.client, uptoken)
for i := 0; i < blockCnt; i++ {
blkIdx := i
blkSize1 := blkSize
if i == last {
offbase := int64(blkIdx) << blockBits
blkSize1 = int(fsize - offbase)
}
task := func() {
defer wg.Done()
tryTimes := extra.TryTimes
lzRetry:
err := p.resumableBput(ctx, upHost, &extra.Progresses[blkIdx], f, blkIdx, blkSize1, extra)
if err != nil {
if tryTimes > 1 {
tryTimes--
log.Info("resumable.Put retrying ...", blkIdx, "reason:", err)
goto lzRetry
}
log.Warn("resumable.Put", blkIdx, "failed:", err)
extra.NotifyErr(blkIdx, blkSize1, err)
nfails++
}
}
tasks <- task
}
wg.Wait()
if nfails != 0 {
return ErrPutFailed
}
return p.Mkfile(ctx, upHost, ret, key, hasKey, fsize, extra)
}
func (p *ResumeUploader) rputFile(
ctx context.Context, ret interface{}, uptoken string,
key string, hasKey bool, localFile string, extra *RputExtra) (err error) {
f, err := os.Open(localFile)
if err != nil {
return
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return
}
return p.rput(ctx, ret, uptoken, key, hasKey, f, fi.Size(), extra)
}
func (p *ResumeUploader) upHost(ak, bucket string) (upHost string, err error) {
var zone *Zone
if p.cfg.Zone != nil {
zone = p.cfg.Zone
} else {
if v, zoneErr := GetZone(ak, bucket); zoneErr != nil {
err = zoneErr
return
} else {
zone = v
}
}
scheme := "http://"
if p.cfg.UseHTTPS {
scheme = "https://"
}
host := zone.SrcUpHosts[0]
if p.cfg.UseCdnDomains {
host = zone.CdnUpHosts[0]
}
upHost = fmt.Sprintf("%s%s", scheme, host)
return
}

View File

@@ -0,0 +1,25 @@
package storage
import (
"context"
"fmt"
"math/rand"
"testing"
)
func TestResumeUploadPutFile(t *testing.T) {
var putRet PutRet
ctx := context.TODO()
putPolicy := PutPolicy{
Scope: testBucket,
DeleteAfterDays: 7,
}
upToken := putPolicy.UploadToken(mac)
testKey := fmt.Sprintf("testRPutFileKey_%d", rand.Int())
err := resumeUploader.PutFile(ctx, &putRet, upToken, testKey, testLocalFile, nil)
if err != nil {
t.Fatalf("ResumeUploader#PutFile() error, %s", err)
}
t.Logf("Key: %s, Hash:%s", putRet.Key, putRet.Hash)
}

73
vendor/github.com/qiniu/api.v7/storage/token.go generated vendored Normal file
View File

@@ -0,0 +1,73 @@
package storage
import (
"encoding/base64"
"encoding/json"
"errors"
"strings"
"time"
"github.com/qiniu/api.v7/auth/qbox"
)
// PutPolicy 表示文件上传的上传策略
type PutPolicy struct {
Scope string `json:"scope"`
Expires uint32 `json:"deadline"` // 截止时间(以秒为单位)
IsPrefixalScope int `json:"isPrefixalScope,omitempty"`
InsertOnly uint16 `json:"insertOnly,omitempty"` // 若非0, 即使Scope为 Bucket:Key 的形式也是insert only
DetectMime uint8 `json:"detectMime,omitempty"` // 若非0, 则服务端根据内容自动确定 MimeType
FsizeLimit int64 `json:"fsizeLimit,omitempty"`
MimeLimit string `json:"mimeLimit,omitempty"`
SaveKey string `json:"saveKey,omitempty"`
CallbackFetchKey uint8 `json:"callbackFetchKey,omitempty"`
CallbackURL string `json:"callbackUrl,omitempty"`
CallbackHost string `json:"callbackHost,omitempty"`
CallbackBody string `json:"callbackBody,omitempty"`
CallbackBodyType string `json:"callbackBodyType,omitempty"`
ReturnURL string `json:"returnUrl,omitempty"`
ReturnBody string `json:"returnBody,omitempty"`
PersistentOps string `json:"persistentOps,omitempty"`
PersistentNotifyURL string `json:"persistentNotifyUrl,omitempty"`
PersistentPipeline string `json:"persistentPipeline,omitempty"`
EndUser string `json:"endUser,omitempty"`
DeleteAfterDays int `json:"deleteAfterDays,omitempty"`
FileType int `json:"fileType,omitempty"`
}
// UploadToken 方法用来进行上传凭证的生成
func (p *PutPolicy) UploadToken(mac *qbox.Mac) (token string) {
if p.Expires == 0 {
p.Expires = 3600 // 1 hour
}
p.Expires += uint32(time.Now().Unix())
putPolicyJSON, _ := json.Marshal(p)
token = mac.SignWithData(putPolicyJSON)
return
}
func getAkBucketFromUploadToken(token string) (ak, bucket string, err error) {
items := strings.Split(token, ":")
if len(items) != 3 {
err = errors.New("invalid upload token, format error")
return
}
ak = items[0]
policyBytes, dErr := base64.URLEncoding.DecodeString(items[2])
if dErr != nil {
err = errors.New("invalid upload token, invalid put policy")
return
}
putPolicy := PutPolicy{}
uErr := json.Unmarshal(policyBytes, &putPolicy)
if uErr != nil {
err = errors.New("invalid upload token, invalid put policy")
return
}
bucket = strings.Split(putPolicy.Scope, ":")[0]
return
}

22
vendor/github.com/qiniu/api.v7/storage/util.go generated vendored Normal file
View File

@@ -0,0 +1,22 @@
package storage
import (
"time"
)
// ParsePutTime 提供了将PutTime转换为 time.Time 的功能
func ParsePutTime(putTime int64) (t time.Time) {
t = time.Unix(0, putTime*100)
return
}
// IsContextExpired 检查分片上传的ctx是否过期提前一天让它过期
// 因为我们认为如果断点继续上传的话最长需要1天时间
func IsContextExpired(blkPut BlkputRet) bool {
if blkPut.Ctx == "" {
return false
}
target := time.Unix(blkPut.ExpiredAt, 0).AddDate(0, 0, -1)
now := time.Now()
return now.After(target)
}

190
vendor/github.com/qiniu/api.v7/storage/zone.go generated vendored Normal file
View File

@@ -0,0 +1,190 @@
package storage
import (
"context"
"fmt"
"strings"
"sync"
"github.com/qiniu/x/rpc.v7"
)
// Zone 为空间对应的机房属性,主要包括了上传,资源管理等操作的域名
type Zone struct {
SrcUpHosts []string
CdnUpHosts []string
RsHost string
RsfHost string
ApiHost string
IovipHost string
}
func (z *Zone) String() string {
str := ""
str += fmt.Sprintf("SrcUpHosts: %v\n", z.SrcUpHosts)
str += fmt.Sprintf("CdnUpHosts: %v\n", z.CdnUpHosts)
str += fmt.Sprintf("IovipHost: %s\n", z.IovipHost)
str += fmt.Sprintf("RsHost: %s\n", z.RsHost)
str += fmt.Sprintf("RsfHost: %s\n", z.RsfHost)
str += fmt.Sprintf("ApiHost: %s\n", z.ApiHost)
return str
}
// ZoneHuadong 表示华东机房
var ZoneHuadong = Zone{
SrcUpHosts: []string{
"up.qiniup.com",
"up-nb.qiniup.com",
"up-xs.qiniup.com",
},
CdnUpHosts: []string{
"upload.qiniup.com",
"upload-nb.qiniup.com",
"upload-xs.qiniup.com",
},
RsHost: "rs.qiniu.com",
RsfHost: "rsf.qiniu.com",
ApiHost: "api.qiniu.com",
IovipHost: "iovip.qbox.me",
}
// ZoneHuabei 表示华北机房
var ZoneHuabei = Zone{
SrcUpHosts: []string{
"up-z1.qiniup.com",
},
CdnUpHosts: []string{
"upload-z1.qiniup.com",
},
RsHost: "rs-z1.qiniu.com",
RsfHost: "rsf-z1.qiniu.com",
ApiHost: "api-z1.qiniu.com",
IovipHost: "iovip-z1.qbox.me",
}
// ZoneHuanan 表示华南机房
var ZoneHuanan = Zone{
SrcUpHosts: []string{
"up-z2.qiniup.com",
"up-gz.qiniup.com",
"up-fs.qiniup.com",
},
CdnUpHosts: []string{
"upload-z2.qiniup.com",
"upload-gz.qiniup.com",
"upload-fs.qiniup.com",
},
RsHost: "rs-z2.qiniu.com",
RsfHost: "rsf-z2.qiniu.com",
ApiHost: "api-z2.qiniu.com",
IovipHost: "iovip-z2.qbox.me",
}
// ZoneBeimei 表示北美机房
var ZoneBeimei = Zone{
SrcUpHosts: []string{
"up-na0.qiniu.com",
},
CdnUpHosts: []string{
"upload-na0.qiniu.com",
},
RsHost: "rs-na0.qiniu.com",
RsfHost: "rsf-na0.qiniu.com",
ApiHost: "api-na0.qiniu.com",
IovipHost: "iovip-na0.qbox.me",
}
// for programmers
var Zone_z0 = ZoneHuadong
var Zone_z1 = ZoneHuabei
var Zone_z2 = ZoneHuanan
var Zone_na0 = ZoneBeimei
// UcHost 为查询空间相关域名的API服务地址
const UcHost = "https://uc.qbox.me"
// UcQueryRet 为查询请求的回复
type UcQueryRet struct {
TTL int `json:"ttl"`
Io map[string]map[string][]string `json:"io"`
Up map[string]UcQueryUp `json:"up"`
}
// UcQueryUp 为查询请求回复中的上传域名信息
type UcQueryUp struct {
Main []string `json:"main,omitempty"`
Backup []string `json:"backup,omitempty"`
Info string `json:"info,omitempty"`
}
var (
zoneMutext sync.RWMutex
zoneCache = make(map[string]*Zone)
)
// GetZone 用来根据ak和bucket来获取空间相关的机房信息
func GetZone(ak, bucket string) (zone *Zone, err error) {
zoneID := fmt.Sprintf("%s:%s", ak, bucket)
//check from cache
zoneMutext.RLock()
if v, ok := zoneCache[zoneID]; ok {
zone = v
}
zoneMutext.RUnlock()
if zone != nil {
return
}
//query from server
reqURL := fmt.Sprintf("%s/v2/query?ak=%s&bucket=%s", UcHost, ak, bucket)
var ret UcQueryRet
ctx := context.Background()
qErr := rpc.DefaultClient.CallWithForm(ctx, &ret, "GET", reqURL, nil)
if qErr != nil {
err = fmt.Errorf("query zone error, %s", qErr.Error())
return
}
ioHost := ret.Io["src"]["main"][0]
srcUpHosts := ret.Up["src"].Main
if ret.Up["src"].Backup != nil {
srcUpHosts = append(srcUpHosts, ret.Up["src"].Backup...)
}
cdnUpHosts := ret.Up["acc"].Main
if ret.Up["acc"].Backup != nil {
cdnUpHosts = append(cdnUpHosts, ret.Up["acc"].Backup...)
}
zone = &Zone{
SrcUpHosts: srcUpHosts,
CdnUpHosts: cdnUpHosts,
IovipHost: ioHost,
RsHost: DefaultRsHost,
RsfHost: DefaultRsfHost,
ApiHost: DefaultAPIHost,
}
//set specific hosts if possible
setSpecificHosts(ioHost, zone)
zoneMutext.Lock()
zoneCache[zoneID] = zone
zoneMutext.Unlock()
return
}
func setSpecificHosts(ioHost string, zone *Zone) {
if strings.Contains(ioHost, "-z1") {
zone.RsHost = "rs-z1.qiniu.com"
zone.RsfHost = "rsf-z1.qiniu.com"
zone.ApiHost = "api-z1.qiniu.com"
} else if strings.Contains(ioHost, "-z2") {
zone.RsHost = "rs-z2.qiniu.com"
zone.RsfHost = "rsf-z2.qiniu.com"
zone.ApiHost = "api-z2.qiniu.com"
} else if strings.Contains(ioHost, "-na0") {
zone.RsHost = "rs-na0.qiniu.com"
zone.RsfHost = "rsf-na0.qiniu.com"
zone.ApiHost = "api-na0.qiniu.com"
}
}