add vendor

This commit is contained in:
deepzz0
2017-02-18 15:23:57 +08:00
parent 4ebbc38cc0
commit 562f4d86c6
1371 changed files with 369552 additions and 2 deletions
+68
View File
@@ -0,0 +1,68 @@
package txn
import (
mrand "math/rand"
"time"
)
var chaosEnabled = false
var chaosSetting Chaos
// Chaos holds parameters for the failure injection mechanism.
type Chaos struct {
// KillChance is the 0.0 to 1.0 chance that a given checkpoint
// within the algorithm will raise an interruption that will
// stop the procedure.
KillChance float64
// SlowdownChance is the 0.0 to 1.0 chance that a given checkpoint
// within the algorithm will be delayed by Slowdown before
// continuing.
SlowdownChance float64
Slowdown time.Duration
// If Breakpoint is set, the above settings will only affect the
// named breakpoint.
Breakpoint string
}
// SetChaos sets the failure injection parameters to c.
func SetChaos(c Chaos) {
chaosSetting = c
chaosEnabled = c.KillChance > 0 || c.SlowdownChance > 0
}
func chaos(bpname string) {
if !chaosEnabled {
return
}
switch chaosSetting.Breakpoint {
case "", bpname:
kc := chaosSetting.KillChance
if kc > 0 && mrand.Intn(1000) < int(kc*1000) {
panic(chaosError{})
}
if bpname == "insert" {
return
}
sc := chaosSetting.SlowdownChance
if sc > 0 && mrand.Intn(1000) < int(sc*1000) {
time.Sleep(chaosSetting.Slowdown)
}
}
}
type chaosError struct{}
func (f *flusher) handleChaos(err *error) {
v := recover()
if v == nil {
return
}
if _, ok := v.(chaosError); ok {
f.debugf("Killed by chaos!")
*err = ErrChaos
return
}
panic(v)
}
+109
View File
@@ -0,0 +1,109 @@
package txn
import (
"bytes"
"fmt"
"sort"
"sync/atomic"
"gopkg.in/mgo.v2/bson"
)
var (
debugEnabled bool
logger log_Logger
)
type log_Logger interface {
Output(calldepth int, s string) error
}
// Specify the *log.Logger where logged messages should be sent to.
func SetLogger(l log_Logger) {
logger = l
}
// SetDebug enables or disables debugging.
func SetDebug(debug bool) {
debugEnabled = debug
}
var ErrChaos = fmt.Errorf("interrupted by chaos")
var debugId uint32
func debugPrefix() string {
d := atomic.AddUint32(&debugId, 1) - 1
s := make([]byte, 0, 10)
for i := uint(0); i < 8; i++ {
s = append(s, "abcdefghijklmnop"[(d>>(4*i))&0xf])
if d>>(4*(i+1)) == 0 {
break
}
}
s = append(s, ')', ' ')
return string(s)
}
func logf(format string, args ...interface{}) {
if logger != nil {
logger.Output(2, fmt.Sprintf(format, argsForLog(args)...))
}
}
func debugf(format string, args ...interface{}) {
if debugEnabled && logger != nil {
logger.Output(2, fmt.Sprintf(format, argsForLog(args)...))
}
}
func argsForLog(args []interface{}) []interface{} {
for i, arg := range args {
switch v := arg.(type) {
case bson.ObjectId:
args[i] = v.Hex()
case []bson.ObjectId:
lst := make([]string, len(v))
for j, id := range v {
lst[j] = id.Hex()
}
args[i] = lst
case map[docKey][]bson.ObjectId:
buf := &bytes.Buffer{}
var dkeys docKeys
for dkey := range v {
dkeys = append(dkeys, dkey)
}
sort.Sort(dkeys)
for i, dkey := range dkeys {
if i > 0 {
buf.WriteByte(' ')
}
buf.WriteString(fmt.Sprintf("%v: {", dkey))
for j, id := range v[dkey] {
if j > 0 {
buf.WriteByte(' ')
}
buf.WriteString(id.Hex())
}
buf.WriteByte('}')
}
args[i] = buf.String()
case map[docKey][]int64:
buf := &bytes.Buffer{}
var dkeys docKeys
for dkey := range v {
dkeys = append(dkeys, dkey)
}
sort.Sort(dkeys)
for i, dkey := range dkeys {
if i > 0 {
buf.WriteByte(' ')
}
buf.WriteString(fmt.Sprintf("%v: %v", dkey, v[dkey]))
}
args[i] = buf.String()
}
}
return args
}
+205
View File
@@ -0,0 +1,205 @@
package txn
import (
"sort"
. "gopkg.in/check.v1"
)
type DocKeySuite struct{}
var _ = Suite(&DocKeySuite{})
type T struct {
A int
B string
}
type T2 struct {
A int
B string
}
type T3 struct {
A int
B string
}
type T4 struct {
A int
B string
}
type T5 struct {
F int
Q string
}
type T6 struct {
A int
B string
}
type T7 struct {
A bool
B float64
}
type T8 struct {
A int
B string
}
type T9 struct {
A int
B string
C bool
}
type T10 struct {
C int `bson:"a"`
D string `bson:"b,omitempty"`
}
type T11 struct {
C int
D string
}
type T12 struct {
S string
}
type T13 struct {
p, q, r bool
S string
}
var docKeysTests = [][]docKeys{
{{
{"c", 1},
{"c", 5},
{"c", 2},
}, {
{"c", 1},
{"c", 2},
{"c", 5},
}}, {{
{"c", "foo"},
{"c", "bar"},
{"c", "bob"},
}, {
{"c", "bar"},
{"c", "bob"},
{"c", "foo"},
}}, {{
{"c", 0.2},
{"c", 0.07},
{"c", 0.9},
}, {
{"c", 0.07},
{"c", 0.2},
{"c", 0.9},
}}, {{
{"c", true},
{"c", false},
{"c", true},
}, {
{"c", false},
{"c", true},
{"c", true},
}}, {{
{"c", T{1, "b"}},
{"c", T{1, "a"}},
{"c", T{0, "b"}},
{"c", T{0, "a"}},
}, {
{"c", T{0, "a"}},
{"c", T{0, "b"}},
{"c", T{1, "a"}},
{"c", T{1, "b"}},
}}, {{
{"c", T{1, "a"}},
{"c", T{0, "a"}},
}, {
{"c", T{0, "a"}},
{"c", T{1, "a"}},
}}, {{
{"c", T3{0, "b"}},
{"c", T2{1, "b"}},
{"c", T3{1, "a"}},
{"c", T2{0, "a"}},
}, {
{"c", T2{0, "a"}},
{"c", T3{0, "b"}},
{"c", T3{1, "a"}},
{"c", T2{1, "b"}},
}}, {{
{"c", T5{1, "b"}},
{"c", T4{1, "b"}},
{"c", T5{0, "a"}},
{"c", T4{0, "a"}},
}, {
{"c", T4{0, "a"}},
{"c", T5{0, "a"}},
{"c", T4{1, "b"}},
{"c", T5{1, "b"}},
}}, {{
{"c", T6{1, "b"}},
{"c", T7{true, 0.2}},
{"c", T6{0, "a"}},
{"c", T7{false, 0.04}},
}, {
{"c", T6{0, "a"}},
{"c", T6{1, "b"}},
{"c", T7{false, 0.04}},
{"c", T7{true, 0.2}},
}}, {{
{"c", T9{1, "b", true}},
{"c", T8{1, "b"}},
{"c", T9{0, "a", false}},
{"c", T8{0, "a"}},
}, {
{"c", T9{0, "a", false}},
{"c", T8{0, "a"}},
{"c", T9{1, "b", true}},
{"c", T8{1, "b"}},
}}, {{
{"b", 2},
{"a", 5},
{"c", 2},
{"b", 1},
}, {
{"a", 5},
{"b", 1},
{"b", 2},
{"c", 2},
}}, {{
{"c", T11{1, "a"}},
{"c", T11{1, "a"}},
{"c", T10{1, "a"}},
}, {
{"c", T10{1, "a"}},
{"c", T11{1, "a"}},
{"c", T11{1, "a"}},
}}, {{
{"c", T12{"a"}},
{"c", T13{false, true, false, "a"}},
{"c", T12{"b"}},
{"c", T13{false, true, false, "b"}},
}, {
{"c", T12{"a"}},
{"c", T13{false, true, false, "a"}},
{"c", T12{"b"}},
{"c", T13{false, true, false, "b"}},
}},
}
func (s *DocKeySuite) TestSort(c *C) {
for _, test := range docKeysTests {
keys := test[0]
expected := test[1]
sort.Sort(keys)
c.Check(keys, DeepEquals, expected)
}
}
+985
View File
@@ -0,0 +1,985 @@
package txn
import (
"fmt"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
func flush(r *Runner, t *transaction) error {
f := &flusher{
Runner: r,
goal: t,
goalKeys: make(map[docKey]bool),
queue: make(map[docKey][]token),
debugId: debugPrefix(),
}
for _, dkey := range f.goal.docKeys() {
f.goalKeys[dkey] = true
}
return f.run()
}
type flusher struct {
*Runner
goal *transaction
goalKeys map[docKey]bool
queue map[docKey][]token
debugId string
}
func (f *flusher) run() (err error) {
if chaosEnabled {
defer f.handleChaos(&err)
}
f.debugf("Processing %s", f.goal)
seen := make(map[bson.ObjectId]*transaction)
if err := f.recurse(f.goal, seen); err != nil {
return err
}
if f.goal.done() {
return nil
}
// Sparse workloads will generally be managed entirely by recurse.
// Getting here means one or more transactions have dependencies
// and perhaps cycles.
// Build successors data for Tarjan's sort. Must consider
// that entries in txn-queue are not necessarily valid.
successors := make(map[bson.ObjectId][]bson.ObjectId)
ready := true
for _, dqueue := range f.queue {
NextPair:
for i := 0; i < len(dqueue); i++ {
pred := dqueue[i]
predid := pred.id()
predt := seen[predid]
if predt == nil || predt.Nonce != pred.nonce() {
continue
}
predsuccids, ok := successors[predid]
if !ok {
successors[predid] = nil
}
for j := i + 1; j < len(dqueue); j++ {
succ := dqueue[j]
succid := succ.id()
succt := seen[succid]
if succt == nil || succt.Nonce != succ.nonce() {
continue
}
if _, ok := successors[succid]; !ok {
successors[succid] = nil
}
// Found a valid pred/succ pair.
i = j - 1
for _, predsuccid := range predsuccids {
if predsuccid == succid {
continue NextPair
}
}
successors[predid] = append(predsuccids, succid)
if succid == f.goal.Id {
// There are still pre-requisites to handle.
ready = false
}
continue NextPair
}
}
}
f.debugf("Queues: %v", f.queue)
f.debugf("Successors: %v", successors)
if ready {
f.debugf("Goal %s has no real pre-requisites", f.goal)
return f.advance(f.goal, nil, true)
}
// Robert Tarjan's algorithm for detecting strongly-connected
// components is used for topological sorting and detecting
// cycles at once. The order in which transactions are applied
// in commonly affected documents must be a global agreement.
sorted := tarjanSort(successors)
if debugEnabled {
f.debugf("Tarjan output: %v", sorted)
}
pull := make(map[bson.ObjectId]*transaction)
for i := len(sorted) - 1; i >= 0; i-- {
scc := sorted[i]
f.debugf("Flushing %v", scc)
if len(scc) == 1 {
pull[scc[0]] = seen[scc[0]]
}
for _, id := range scc {
if err := f.advance(seen[id], pull, true); err != nil {
return err
}
}
if len(scc) > 1 {
for _, id := range scc {
pull[id] = seen[id]
}
}
}
return nil
}
func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction) error {
seen[t.Id] = t
err := f.advance(t, nil, false)
if err != errPreReqs {
return err
}
for _, dkey := range t.docKeys() {
for _, dtt := range f.queue[dkey] {
id := dtt.id()
if seen[id] != nil {
continue
}
qt, err := f.load(id)
if err != nil {
return err
}
err = f.recurse(qt, seen)
if err != nil {
return err
}
}
}
return nil
}
func (f *flusher) advance(t *transaction, pull map[bson.ObjectId]*transaction, force bool) error {
for {
switch t.State {
case tpreparing, tprepared:
revnos, err := f.prepare(t, force)
if err != nil {
return err
}
if t.State != tprepared {
continue
}
if err = f.assert(t, revnos, pull); err != nil {
return err
}
if t.State != tprepared {
continue
}
if err = f.checkpoint(t, revnos); err != nil {
return err
}
case tapplying:
return f.apply(t, pull)
case taborting:
return f.abortOrReload(t, nil, pull)
case tapplied, taborted:
return nil
default:
panic(fmt.Errorf("transaction in unknown state: %q", t.State))
}
}
panic("unreachable")
}
type stash string
const (
stashStable stash = ""
stashInsert stash = "insert"
stashRemove stash = "remove"
)
type txnInfo struct {
Queue []token `bson:"txn-queue"`
Revno int64 `bson:"txn-revno,omitempty"`
Insert bson.ObjectId `bson:"txn-insert,omitempty"`
Remove bson.ObjectId `bson:"txn-remove,omitempty"`
}
type stashState string
const (
stashNew stashState = ""
stashInserting stashState = "inserting"
)
var txnFields = bson.D{{"txn-queue", 1}, {"txn-revno", 1}, {"txn-remove", 1}, {"txn-insert", 1}}
var errPreReqs = fmt.Errorf("transaction has pre-requisites and force is false")
// prepare injects t's id onto txn-queue for all affected documents
// and collects the current txn-queue and txn-revno values during
// the process. If the prepared txn-queue indicates that there are
// pre-requisite transactions to be applied and the force parameter
// is false, errPreReqs will be returned. Otherwise, the current
// tip revision numbers for all the documents are returned.
func (f *flusher) prepare(t *transaction, force bool) (revnos []int64, err error) {
if t.State != tpreparing {
return f.rescan(t, force)
}
f.debugf("Preparing %s", t)
// dkeys being sorted means stable iteration across all runners. This
// isn't strictly required, but reduces the chances of cycles.
dkeys := t.docKeys()
revno := make(map[docKey]int64)
info := txnInfo{}
tt := tokenFor(t)
NextDoc:
for _, dkey := range dkeys {
change := mgo.Change{
Update: bson.D{{"$addToSet", bson.D{{"txn-queue", tt}}}},
ReturnNew: true,
}
c := f.tc.Database.C(dkey.C)
cquery := c.FindId(dkey.Id).Select(txnFields)
RetryDoc:
change.Upsert = false
chaos("")
if _, err := cquery.Apply(change, &info); err == nil {
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
continue NextDoc
} else {
// Handle remove in progress before preparing it.
if err := f.loadAndApply(info.Remove); err != nil {
return nil, err
}
goto RetryDoc
}
} else if err != mgo.ErrNotFound {
return nil, err
}
// Document missing. Use stash collection.
change.Upsert = true
chaos("")
_, err := f.sc.FindId(dkey).Apply(change, &info)
if err != nil {
return nil, err
}
if info.Insert != "" {
// Handle insert in progress before preparing it.
if err := f.loadAndApply(info.Insert); err != nil {
return nil, err
}
goto RetryDoc
}
// Must confirm stash is still in use and is the same one
// prepared, since applying a remove overwrites the stash.
docFound := false
stashFound := false
if err = c.FindId(dkey.Id).Select(txnFields).One(&info); err == nil {
docFound = true
} else if err != mgo.ErrNotFound {
return nil, err
} else if err = f.sc.FindId(dkey).One(&info); err == nil {
stashFound = true
if info.Revno == 0 {
// Missing revno in the stash only happens when it
// has been upserted, in which case it defaults to -1.
// Txn-inserted documents get revno -1 while in the stash
// for the first time, and -revno-1 == 2 when they go live.
info.Revno = -1
}
} else if err != mgo.ErrNotFound {
return nil, err
}
if docFound && info.Remove == "" || stashFound && info.Insert == "" {
for _, dtt := range info.Queue {
if dtt != tt {
continue
}
// Found tt properly prepared.
if stashFound {
f.debugf("[B] Prepared document %v on stash with revno %d and queue: %v", dkey, info.Revno, info.Queue)
} else {
f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
}
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
continue NextDoc
}
}
// The stash wasn't valid and tt got overwritten. Try again.
f.unstashToken(tt, dkey)
goto RetryDoc
}
// Save the prepared nonce onto t.
nonce := tt.nonce()
qdoc := bson.D{{"_id", t.Id}, {"s", tpreparing}}
udoc := bson.D{{"$set", bson.D{{"s", tprepared}, {"n", nonce}}}}
chaos("set-prepared")
err = f.tc.Update(qdoc, udoc)
if err == nil {
t.State = tprepared
t.Nonce = nonce
} else if err == mgo.ErrNotFound {
f.debugf("Can't save nonce of %s: LOST RACE", tt)
if err := f.reload(t); err != nil {
return nil, err
} else if t.State == tpreparing {
panic("can't save nonce yet transaction is still preparing")
} else if t.State != tprepared {
return t.Revnos, nil
}
tt = t.token()
} else if err != nil {
return nil, err
}
prereqs, found := f.hasPreReqs(tt, dkeys)
if !found {
// Must only happen when reloading above.
return f.rescan(t, force)
} else if prereqs && !force {
f.debugf("Prepared queue with %s [has prereqs & not forced].", tt)
return nil, errPreReqs
}
revnos = assembledRevnos(t.Ops, revno)
if !prereqs {
f.debugf("Prepared queue with %s [no prereqs]. Revnos: %v", tt, revnos)
} else {
f.debugf("Prepared queue with %s [forced] Revnos: %v", tt, revnos)
}
return revnos, nil
}
func (f *flusher) unstashToken(tt token, dkey docKey) error {
qdoc := bson.D{{"_id", dkey}, {"txn-queue", tt}}
udoc := bson.D{{"$pull", bson.D{{"txn-queue", tt}}}}
chaos("")
if err := f.sc.Update(qdoc, udoc); err == nil {
chaos("")
err = f.sc.Remove(bson.D{{"_id", dkey}, {"txn-queue", bson.D{}}})
} else if err != mgo.ErrNotFound {
return err
}
return nil
}
func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) {
f.debugf("Rescanning %s", t)
if t.State != tprepared {
panic(fmt.Errorf("rescanning transaction in invalid state: %q", t.State))
}
// dkeys being sorted means stable iteration across all
// runners. This isn't strictly required, but reduces the chances
// of cycles.
dkeys := t.docKeys()
tt := t.token()
if !force {
prereqs, found := f.hasPreReqs(tt, dkeys)
if found && prereqs {
// Its state is already known.
return nil, errPreReqs
}
}
revno := make(map[docKey]int64)
info := txnInfo{}
for _, dkey := range dkeys {
const retries = 3
retry := -1
RetryDoc:
retry++
c := f.tc.Database.C(dkey.C)
if err := c.FindId(dkey.Id).Select(txnFields).One(&info); err == mgo.ErrNotFound {
// Document is missing. Look in stash.
chaos("")
if err := f.sc.FindId(dkey).One(&info); err == mgo.ErrNotFound {
// Stash also doesn't exist. Maybe someone applied it.
if err := f.reload(t); err != nil {
return nil, err
} else if t.State != tprepared {
return t.Revnos, err
}
// Not applying either.
if retry < retries {
// Retry since there might be an insert/remove race.
goto RetryDoc
}
// Neither the doc nor the stash seem to exist.
return nil, fmt.Errorf("cannot find document %v for applying transaction %s", dkey, t)
} else if err != nil {
return nil, err
}
// Stash found.
if info.Insert != "" {
// Handle insert in progress before assuming ordering is good.
if err := f.loadAndApply(info.Insert); err != nil {
return nil, err
}
goto RetryDoc
}
if info.Revno == 0 {
// Missing revno in the stash means -1.
info.Revno = -1
}
} else if err != nil {
return nil, err
} else if info.Remove != "" {
// Handle remove in progress before assuming ordering is good.
if err := f.loadAndApply(info.Remove); err != nil {
return nil, err
}
goto RetryDoc
}
revno[dkey] = info.Revno
found := false
for _, id := range info.Queue {
if id == tt {
found = true
break
}
}
f.queue[dkey] = info.Queue
if !found {
// Rescanned transaction id was not in the queue. This could mean one
// of three things:
// 1) The transaction was applied and popped by someone else. This is
// the common case.
// 2) We've read an out-of-date queue from the stash. This can happen
// when someone else was paused for a long while preparing another
// transaction for this document, and improperly upserted to the
// stash when unpaused (after someone else inserted the document).
// This is rare but possible.
// 3) There's an actual bug somewhere, or outside interference. Worst
// possible case.
f.debugf("Rescanned document %v misses %s in queue: %v", dkey, tt, info.Queue)
err := f.reload(t)
if t.State == tpreparing || t.State == tprepared {
if retry < retries {
// Case 2.
goto RetryDoc
}
// Case 3.
return nil, fmt.Errorf("cannot find transaction %s in queue for document %v", t, dkey)
}
// Case 1.
return t.Revnos, err
}
}
prereqs, found := f.hasPreReqs(tt, dkeys)
if !found {
panic("rescanning loop guarantees that this can't happen")
} else if prereqs && !force {
f.debugf("Rescanned queue with %s: has prereqs, not forced", tt)
return nil, errPreReqs
}
revnos = assembledRevnos(t.Ops, revno)
if !prereqs {
f.debugf("Rescanned queue with %s: no prereqs, revnos: %v", tt, revnos)
} else {
f.debugf("Rescanned queue with %s: has prereqs, forced, revnos: %v", tt, revnos)
}
return revnos, nil
}
func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 {
revnos := make([]int64, len(ops))
for i, op := range ops {
dkey := op.docKey()
revnos[i] = revno[dkey]
drevno := revno[dkey]
switch {
case op.Insert != nil && drevno < 0:
revno[dkey] = -drevno + 1
case op.Update != nil && drevno >= 0:
revno[dkey] = drevno + 1
case op.Remove && drevno >= 0:
revno[dkey] = -drevno - 1
}
}
return revnos
}
func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) {
found = true
NextDoc:
for _, dkey := range dkeys {
for _, dtt := range f.queue[dkey] {
if dtt == tt {
continue NextDoc
} else if dtt.id() != tt.id() {
prereqs = true
}
}
found = false
}
return
}
func (f *flusher) reload(t *transaction) error {
var newt transaction
query := f.tc.FindId(t.Id)
query.Select(bson.D{{"s", 1}, {"n", 1}, {"r", 1}})
if err := query.One(&newt); err != nil {
return fmt.Errorf("failed to reload transaction: %v", err)
}
t.State = newt.State
t.Nonce = newt.Nonce
t.Revnos = newt.Revnos
f.debugf("Reloaded %s: %q", t, t.State)
return nil
}
func (f *flusher) loadAndApply(id bson.ObjectId) error {
t, err := f.load(id)
if err != nil {
return err
}
return f.advance(t, nil, true)
}
// assert verifies that all assertions in t match the content that t
// will be applied upon. If an assertion fails, the transaction state
// is changed to aborted.
func (f *flusher) assert(t *transaction, revnos []int64, pull map[bson.ObjectId]*transaction) error {
f.debugf("Asserting %s with revnos %v", t, revnos)
if t.State != tprepared {
panic(fmt.Errorf("asserting transaction in invalid state: %q", t.State))
}
qdoc := make(bson.D, 3)
revno := make(map[docKey]int64)
for i, op := range t.Ops {
dkey := op.docKey()
if _, ok := revno[dkey]; !ok {
revno[dkey] = revnos[i]
}
if op.Assert == nil {
continue
}
if op.Assert == DocMissing {
if revnos[i] >= 0 {
return f.abortOrReload(t, revnos, pull)
}
continue
}
if op.Insert != nil {
return fmt.Errorf("Insert can only Assert txn.DocMissing", op.Assert)
}
// if revnos[i] < 0 { abort }?
qdoc = append(qdoc[:0], bson.DocElem{"_id", op.Id})
if op.Assert != DocMissing {
var revnoq interface{}
if n := revno[dkey]; n == 0 {
revnoq = bson.D{{"$exists", false}}
} else {
revnoq = n
}
// XXX Add tt to the query here, once we're sure it's all working.
// Not having it increases the chances of breaking on bad logic.
qdoc = append(qdoc, bson.DocElem{"txn-revno", revnoq})
if op.Assert != DocExists {
qdoc = append(qdoc, bson.DocElem{"$or", []interface{}{op.Assert}})
}
}
c := f.tc.Database.C(op.C)
if err := c.Find(qdoc).Select(bson.D{{"_id", 1}}).One(nil); err == mgo.ErrNotFound {
// Assertion failed or someone else started applying.
return f.abortOrReload(t, revnos, pull)
} else if err != nil {
return err
}
}
f.debugf("Asserting %s succeeded", t)
return nil
}
func (f *flusher) abortOrReload(t *transaction, revnos []int64, pull map[bson.ObjectId]*transaction) (err error) {
f.debugf("Aborting or reloading %s (was %q)", t, t.State)
if t.State == tprepared {
qdoc := bson.D{{"_id", t.Id}, {"s", tprepared}}
udoc := bson.D{{"$set", bson.D{{"s", taborting}}}}
chaos("set-aborting")
if err = f.tc.Update(qdoc, udoc); err == nil {
t.State = taborting
} else if err == mgo.ErrNotFound {
if err = f.reload(t); err != nil || t.State != taborting {
f.debugf("Won't abort %s. Reloaded state: %q", t, t.State)
return err
}
} else {
return err
}
} else if t.State != taborting {
panic(fmt.Errorf("aborting transaction in invalid state: %q", t.State))
}
if len(revnos) > 0 {
if pull == nil {
pull = map[bson.ObjectId]*transaction{t.Id: t}
}
seen := make(map[docKey]bool)
for i, op := range t.Ops {
dkey := op.docKey()
if seen[op.docKey()] {
continue
}
seen[dkey] = true
pullAll := tokensToPull(f.queue[dkey], pull, "")
if len(pullAll) == 0 {
continue
}
udoc := bson.D{{"$pullAll", bson.D{{"txn-queue", pullAll}}}}
chaos("")
if revnos[i] < 0 {
err = f.sc.UpdateId(dkey, udoc)
} else {
c := f.tc.Database.C(dkey.C)
err = c.UpdateId(dkey.Id, udoc)
}
if err != nil && err != mgo.ErrNotFound {
return err
}
}
}
udoc := bson.D{{"$set", bson.D{{"s", taborted}}}}
chaos("set-aborted")
if err := f.tc.UpdateId(t.Id, udoc); err != nil && err != mgo.ErrNotFound {
return err
}
t.State = taborted
f.debugf("Aborted %s", t)
return nil
}
func (f *flusher) checkpoint(t *transaction, revnos []int64) error {
var debugRevnos map[docKey][]int64
if debugEnabled {
debugRevnos = make(map[docKey][]int64)
for i, op := range t.Ops {
dkey := op.docKey()
debugRevnos[dkey] = append(debugRevnos[dkey], revnos[i])
}
f.debugf("Ready to apply %s. Saving revnos %v", t, debugRevnos)
}
// Save in t the txn-revno values the transaction must run on.
qdoc := bson.D{{"_id", t.Id}, {"s", tprepared}}
udoc := bson.D{{"$set", bson.D{{"s", tapplying}, {"r", revnos}}}}
chaos("set-applying")
err := f.tc.Update(qdoc, udoc)
if err == nil {
t.State = tapplying
t.Revnos = revnos
f.debugf("Ready to apply %s. Saving revnos %v: DONE", t, debugRevnos)
} else if err == mgo.ErrNotFound {
f.debugf("Ready to apply %s. Saving revnos %v: LOST RACE", t, debugRevnos)
return f.reload(t)
}
return nil
}
func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) error {
f.debugf("Applying transaction %s", t)
if t.State != tapplying {
panic(fmt.Errorf("applying transaction in invalid state: %q", t.State))
}
if pull == nil {
pull = map[bson.ObjectId]*transaction{t.Id: t}
}
logRevnos := append([]int64(nil), t.Revnos...)
logDoc := bson.D{{"_id", t.Id}}
tt := tokenFor(t)
for i := range t.Ops {
op := &t.Ops[i]
dkey := op.docKey()
dqueue := f.queue[dkey]
revno := t.Revnos[i]
var opName string
if debugEnabled {
opName = op.name()
f.debugf("Applying %s op %d (%s) on %v with txn-revno %d", t, i, opName, dkey, revno)
}
c := f.tc.Database.C(op.C)
qdoc := bson.D{{"_id", dkey.Id}, {"txn-revno", revno}, {"txn-queue", tt}}
if op.Insert != nil {
qdoc[0].Value = dkey
if revno == -1 {
qdoc[1].Value = bson.D{{"$exists", false}}
}
} else if revno == 0 {
// There's no document with revno 0. The only way to see it is
// when an existent document participates in a transaction the
// first time. Txn-inserted documents get revno -1 while in the
// stash for the first time, and -revno-1 == 2 when they go live.
qdoc[1].Value = bson.D{{"$exists", false}}
}
pullAll := tokensToPull(dqueue, pull, tt)
var d bson.D
var outcome string
var err error
switch {
case op.Update != nil:
if revno < 0 {
err = mgo.ErrNotFound
f.debugf("Won't try to apply update op; negative revision means the document is missing or stashed")
} else {
newRevno := revno + 1
logRevnos[i] = newRevno
if d, err = objToDoc(op.Update); err != nil {
return err
}
if d, err = addToDoc(d, "$pullAll", bson.D{{"txn-queue", pullAll}}); err != nil {
return err
}
if d, err = addToDoc(d, "$set", bson.D{{"txn-revno", newRevno}}); err != nil {
return err
}
chaos("")
err = c.Update(qdoc, d)
}
case op.Remove:
if revno < 0 {
err = mgo.ErrNotFound
} else {
newRevno := -revno - 1
logRevnos[i] = newRevno
nonce := newNonce()
stash := txnInfo{}
change := mgo.Change{
Update: bson.D{{"$push", bson.D{{"n", nonce}}}},
Upsert: true,
ReturnNew: true,
}
if _, err = f.sc.FindId(dkey).Apply(change, &stash); err != nil {
return err
}
change = mgo.Change{
Update: bson.D{{"$set", bson.D{{"txn-remove", t.Id}}}},
ReturnNew: true,
}
var info txnInfo
if _, err = c.Find(qdoc).Apply(change, &info); err == nil {
// The document still exists so the stash previously
// observed was either out of date or necessarily
// contained the token being applied.
f.debugf("Marked document %v to be removed on revno %d with queue: %v", dkey, info.Revno, info.Queue)
updated := false
if !hasToken(stash.Queue, tt) {
var set, unset bson.D
if revno == 0 {
// Missing revno in stash means -1.
set = bson.D{{"txn-queue", info.Queue}}
unset = bson.D{{"n", 1}, {"txn-revno", 1}}
} else {
set = bson.D{{"txn-queue", info.Queue}, {"txn-revno", newRevno}}
unset = bson.D{{"n", 1}}
}
qdoc := bson.D{{"_id", dkey}, {"n", nonce}}
udoc := bson.D{{"$set", set}, {"$unset", unset}}
if err = f.sc.Update(qdoc, udoc); err == nil {
updated = true
} else if err != mgo.ErrNotFound {
return err
}
}
if updated {
f.debugf("Updated stash for document %v with revno %d and queue: %v", dkey, newRevno, info.Queue)
} else {
f.debugf("Stash for document %v was up-to-date", dkey)
}
err = c.Remove(qdoc)
}
}
case op.Insert != nil:
if revno >= 0 {
err = mgo.ErrNotFound
} else {
newRevno := -revno + 1
logRevnos[i] = newRevno
if d, err = objToDoc(op.Insert); err != nil {
return err
}
change := mgo.Change{
Update: bson.D{{"$set", bson.D{{"txn-insert", t.Id}}}},
ReturnNew: true,
}
chaos("")
var info txnInfo
if _, err = f.sc.Find(qdoc).Apply(change, &info); err == nil {
f.debugf("Stash for document %v has revno %d and queue: %v", dkey, info.Revno, info.Queue)
d = setInDoc(d, bson.D{{"_id", op.Id}, {"txn-revno", newRevno}, {"txn-queue", info.Queue}})
// Unlikely yet unfortunate race in here if this gets seriously
// delayed. If someone inserts+removes meanwhile, this will
// reinsert, and there's no way to avoid that while keeping the
// collection clean or compromising sharding. applyOps can solve
// the former, but it can't shard (SERVER-1439).
chaos("insert")
err = c.Insert(d)
if err == nil || mgo.IsDup(err) {
if err == nil {
f.debugf("New document %v inserted with revno %d and queue: %v", dkey, info.Revno, info.Queue)
} else {
f.debugf("Document %v already existed", dkey)
}
chaos("")
if err = f.sc.Remove(qdoc); err == nil {
f.debugf("Stash for document %v removed", dkey)
}
}
}
}
case op.Assert != nil:
// Pure assertion. No changes to apply.
}
if err == nil {
outcome = "DONE"
} else if err == mgo.ErrNotFound || mgo.IsDup(err) {
outcome = "MISS"
err = nil
} else {
outcome = err.Error()
}
if debugEnabled {
f.debugf("Applying %s op %d (%s) on %v with txn-revno %d: %s", t, i, opName, dkey, revno, outcome)
}
if err != nil {
return err
}
if f.lc != nil && op.isChange() {
// Add change to the log document.
var dr bson.D
for li := range logDoc {
elem := &logDoc[li]
if elem.Name == op.C {
dr = elem.Value.(bson.D)
break
}
}
if dr == nil {
logDoc = append(logDoc, bson.DocElem{op.C, bson.D{{"d", []interface{}{}}, {"r", []int64{}}}})
dr = logDoc[len(logDoc)-1].Value.(bson.D)
}
dr[0].Value = append(dr[0].Value.([]interface{}), op.Id)
dr[1].Value = append(dr[1].Value.([]int64), logRevnos[i])
}
}
t.State = tapplied
if f.lc != nil {
// Insert log document into the changelog collection.
f.debugf("Inserting %s into change log", t)
err := f.lc.Insert(logDoc)
if err != nil && !mgo.IsDup(err) {
return err
}
}
// It's been applied, so errors are ignored here. It's fine for someone
// else to win the race and mark it as applied, and it's also fine for
// it to remain pending until a later point when someone will perceive
// it has been applied and mark it at such.
f.debugf("Marking %s as applied", t)
chaos("set-applied")
f.tc.Update(bson.D{{"_id", t.Id}, {"s", tapplying}}, bson.D{{"$set", bson.D{{"s", tapplied}}}})
return nil
}
func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token {
var result []token
for j := len(dqueue) - 1; j >= 0; j-- {
dtt := dqueue[j]
if dtt == dontPull {
continue
}
if _, ok := pull[dtt.id()]; ok {
// It was handled before and this is a leftover invalid
// nonce in the queue. Cherry-pick it out.
result = append(result, dtt)
}
}
return result
}
func objToDoc(obj interface{}) (d bson.D, err error) {
data, err := bson.Marshal(obj)
if err != nil {
return nil, err
}
err = bson.Unmarshal(data, &d)
if err != nil {
return nil, err
}
return d, err
}
func addToDoc(doc bson.D, key string, add bson.D) (bson.D, error) {
for i := range doc {
elem := &doc[i]
if elem.Name != key {
continue
}
if old, ok := elem.Value.(bson.D); ok {
elem.Value = append(old, add...)
return doc, nil
} else {
return nil, fmt.Errorf("invalid %q value in change document: %#v", key, elem.Value)
}
}
return append(doc, bson.DocElem{key, add}), nil
}
func setInDoc(doc bson.D, set bson.D) bson.D {
dlen := len(doc)
NextS:
for s := range set {
sname := set[s].Name
for d := 0; d < dlen; d++ {
if doc[d].Name == sname {
doc[d].Value = set[s].Value
continue NextS
}
}
doc = append(doc, set[s])
}
return doc
}
func hasToken(tokens []token, tt token) bool {
for _, ttt := range tokens {
if ttt == tt {
return true
}
}
return false
}
func (f *flusher) debugf(format string, args ...interface{}) {
if !debugEnabled {
return
}
debugf(f.debugId+format, args...)
}
View File
+388
View File
@@ -0,0 +1,388 @@
package txn_test
import (
"flag"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2/dbtest"
"gopkg.in/mgo.v2/txn"
. "gopkg.in/check.v1"
"math/rand"
"time"
)
var (
duration = flag.Duration("duration", 200*time.Millisecond, "duration for each simulation")
seed = flag.Int64("seed", 0, "seed for rand")
)
type params struct {
killChance float64
slowdownChance float64
slowdown time.Duration
unsafe bool
workers int
accounts int
changeHalf bool
reinsertCopy bool
reinsertZeroed bool
changelog bool
changes int
}
func (s *S) TestSim1Worker(c *C) {
simulate(c, &s.server, params{
workers: 1,
accounts: 4,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSim4WorkersDense(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 2,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSim4WorkersSparse(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimHalf1Worker(c *C) {
simulate(c, &s.server, params{
workers: 1,
accounts: 4,
changeHalf: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimHalf4WorkersDense(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 2,
changeHalf: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimHalf4WorkersSparse(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
changeHalf: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimReinsertCopy1Worker(c *C) {
simulate(c, &s.server, params{
workers: 1,
accounts: 10,
reinsertCopy: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimReinsertCopy4Workers(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
reinsertCopy: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimReinsertZeroed1Worker(c *C) {
simulate(c, &s.server, params{
workers: 1,
accounts: 10,
reinsertZeroed: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimReinsertZeroed4Workers(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
reinsertZeroed: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimChangeLog(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
changelog: true,
})
}
type balanceChange struct {
id bson.ObjectId
origin int
target int
amount int
}
func simulate(c *C, server *dbtest.DBServer, params params) {
seed := *seed
if seed == 0 {
seed = time.Now().UnixNano()
}
rand.Seed(seed)
c.Logf("Seed: %v", seed)
txn.SetChaos(txn.Chaos{
KillChance: params.killChance,
SlowdownChance: params.slowdownChance,
Slowdown: params.slowdown,
})
defer txn.SetChaos(txn.Chaos{})
session := server.Session()
defer session.Close()
db := session.DB("test")
tc := db.C("tc")
runner := txn.NewRunner(tc)
tclog := db.C("tc.log")
if params.changelog {
info := mgo.CollectionInfo{
Capped: true,
MaxBytes: 1000000,
}
err := tclog.Create(&info)
c.Assert(err, IsNil)
runner.ChangeLog(tclog)
}
accounts := db.C("accounts")
for i := 0; i < params.accounts; i++ {
err := accounts.Insert(M{"_id": i, "balance": 300})
c.Assert(err, IsNil)
}
var stop time.Time
if params.changes <= 0 {
stop = time.Now().Add(*duration)
}
max := params.accounts
if params.reinsertCopy || params.reinsertZeroed {
max = int(float64(params.accounts) * 1.5)
}
changes := make(chan balanceChange, 1024)
//session.SetMode(mgo.Eventual, true)
for i := 0; i < params.workers; i++ {
go func() {
n := 0
for {
if n > 0 && n == params.changes {
break
}
if !stop.IsZero() && time.Now().After(stop) {
break
}
change := balanceChange{
id: bson.NewObjectId(),
origin: rand.Intn(max),
target: rand.Intn(max),
amount: 100,
}
var old Account
var oldExists bool
if params.reinsertCopy || params.reinsertZeroed {
if err := accounts.FindId(change.origin).One(&old); err != mgo.ErrNotFound {
c.Check(err, IsNil)
change.amount = old.Balance
oldExists = true
}
}
var ops []txn.Op
switch {
case params.reinsertCopy && oldExists:
ops = []txn.Op{{
C: "accounts",
Id: change.origin,
Assert: M{"balance": change.amount},
Remove: true,
}, {
C: "accounts",
Id: change.target,
Assert: txn.DocMissing,
Insert: M{"balance": change.amount},
}}
case params.reinsertZeroed && oldExists:
ops = []txn.Op{{
C: "accounts",
Id: change.target,
Assert: txn.DocMissing,
Insert: M{"balance": 0},
}, {
C: "accounts",
Id: change.origin,
Assert: M{"balance": change.amount},
Remove: true,
}, {
C: "accounts",
Id: change.target,
Assert: txn.DocExists,
Update: M{"$inc": M{"balance": change.amount}},
}}
case params.changeHalf:
ops = []txn.Op{{
C: "accounts",
Id: change.origin,
Assert: M{"balance": M{"$gte": change.amount}},
Update: M{"$inc": M{"balance": -change.amount / 2}},
}, {
C: "accounts",
Id: change.target,
Assert: txn.DocExists,
Update: M{"$inc": M{"balance": change.amount / 2}},
}, {
C: "accounts",
Id: change.origin,
Update: M{"$inc": M{"balance": -change.amount / 2}},
}, {
C: "accounts",
Id: change.target,
Update: M{"$inc": M{"balance": change.amount / 2}},
}}
default:
ops = []txn.Op{{
C: "accounts",
Id: change.origin,
Assert: M{"balance": M{"$gte": change.amount}},
Update: M{"$inc": M{"balance": -change.amount}},
}, {
C: "accounts",
Id: change.target,
Assert: txn.DocExists,
Update: M{"$inc": M{"balance": change.amount}},
}}
}
err := runner.Run(ops, change.id, nil)
if err != nil && err != txn.ErrAborted && err != txn.ErrChaos {
c.Check(err, IsNil)
}
n++
changes <- change
}
changes <- balanceChange{}
}()
}
alive := params.workers
changeLog := make([]balanceChange, 0, 1024)
for alive > 0 {
change := <-changes
if change.id == "" {
alive--
} else {
changeLog = append(changeLog, change)
}
}
c.Check(len(changeLog), Not(Equals), 0, Commentf("No operations were even attempted."))
txn.SetChaos(txn.Chaos{})
err := runner.ResumeAll()
c.Assert(err, IsNil)
n, err := accounts.Count()
c.Check(err, IsNil)
c.Check(n, Equals, params.accounts, Commentf("Number of accounts has changed."))
n, err = accounts.Find(M{"balance": M{"$lt": 0}}).Count()
c.Check(err, IsNil)
c.Check(n, Equals, 0, Commentf("There are %d accounts with negative balance.", n))
globalBalance := 0
iter := accounts.Find(nil).Iter()
account := Account{}
for iter.Next(&account) {
globalBalance += account.Balance
}
c.Check(iter.Close(), IsNil)
c.Check(globalBalance, Equals, params.accounts*300, Commentf("Total amount of money should be constant."))
// Compute and verify the exact final state of all accounts.
balance := make(map[int]int)
for i := 0; i < params.accounts; i++ {
balance[i] += 300
}
var applied, aborted int
for _, change := range changeLog {
err := runner.Resume(change.id)
if err == txn.ErrAborted {
aborted++
continue
} else if err != nil {
c.Fatalf("resuming %s failed: %v", change.id, err)
}
balance[change.origin] -= change.amount
balance[change.target] += change.amount
applied++
}
iter = accounts.Find(nil).Iter()
for iter.Next(&account) {
c.Assert(account.Balance, Equals, balance[account.Id])
}
c.Check(iter.Close(), IsNil)
c.Logf("Total transactions: %d (%d applied, %d aborted)", len(changeLog), applied, aborted)
if params.changelog {
n, err := tclog.Count()
c.Assert(err, IsNil)
// Check if the capped collection is full.
dummy := make([]byte, 1024)
tclog.Insert(M{"_id": bson.NewObjectId(), "dummy": dummy})
m, err := tclog.Count()
c.Assert(err, IsNil)
if m == n+1 {
// Wasn't full, so it must have seen it all.
c.Assert(err, IsNil)
c.Assert(n, Equals, applied)
}
}
}
+94
View File
@@ -0,0 +1,94 @@
package txn
import (
"gopkg.in/mgo.v2/bson"
"sort"
)
func tarjanSort(successors map[bson.ObjectId][]bson.ObjectId) [][]bson.ObjectId {
// http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
data := &tarjanData{
successors: successors,
nodes: make([]tarjanNode, 0, len(successors)),
index: make(map[bson.ObjectId]int, len(successors)),
}
for id := range successors {
id := bson.ObjectId(string(id))
if _, seen := data.index[id]; !seen {
data.strongConnect(id)
}
}
// Sort connected components to stabilize the algorithm.
for _, ids := range data.output {
if len(ids) > 1 {
sort.Sort(idList(ids))
}
}
return data.output
}
type tarjanData struct {
successors map[bson.ObjectId][]bson.ObjectId
output [][]bson.ObjectId
nodes []tarjanNode
stack []bson.ObjectId
index map[bson.ObjectId]int
}
type tarjanNode struct {
lowlink int
stacked bool
}
type idList []bson.ObjectId
func (l idList) Len() int { return len(l) }
func (l idList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
func (l idList) Less(i, j int) bool { return l[i] < l[j] }
func (data *tarjanData) strongConnect(id bson.ObjectId) *tarjanNode {
index := len(data.nodes)
data.index[id] = index
data.stack = append(data.stack, id)
data.nodes = append(data.nodes, tarjanNode{index, true})
node := &data.nodes[index]
for _, succid := range data.successors[id] {
succindex, seen := data.index[succid]
if !seen {
succnode := data.strongConnect(succid)
if succnode.lowlink < node.lowlink {
node.lowlink = succnode.lowlink
}
} else if data.nodes[succindex].stacked {
// Part of the current strongly-connected component.
if succindex < node.lowlink {
node.lowlink = succindex
}
}
}
if node.lowlink == index {
// Root node; pop stack and output new
// strongly-connected component.
var scc []bson.ObjectId
i := len(data.stack) - 1
for {
stackid := data.stack[i]
stackindex := data.index[stackid]
data.nodes[stackindex].stacked = false
scc = append(scc, stackid)
if stackindex == index {
break
}
i--
}
data.stack = data.stack[:i]
data.output = append(data.output, scc)
}
return node
}
+44
View File
@@ -0,0 +1,44 @@
package txn
import (
"fmt"
"gopkg.in/mgo.v2/bson"
. "gopkg.in/check.v1"
)
type TarjanSuite struct{}
var _ = Suite(TarjanSuite{})
func bid(n int) bson.ObjectId {
return bson.ObjectId(fmt.Sprintf("%024d", n))
}
func bids(ns ...int) (ids []bson.ObjectId) {
for _, n := range ns {
ids = append(ids, bid(n))
}
return
}
func (TarjanSuite) TestExample(c *C) {
successors := map[bson.ObjectId][]bson.ObjectId{
bid(1): bids(2, 3),
bid(2): bids(1, 5),
bid(3): bids(4),
bid(4): bids(3, 5),
bid(5): bids(6),
bid(6): bids(7),
bid(7): bids(8),
bid(8): bids(6, 9),
bid(9): bids(),
}
c.Assert(tarjanSort(successors), DeepEquals, [][]bson.ObjectId{
bids(9),
bids(6, 7, 8),
bids(5),
bids(3, 4),
bids(1, 2),
})
}
+611
View File
@@ -0,0 +1,611 @@
// The txn package implements support for multi-document transactions.
//
// For details check the following blog post:
//
// http://blog.labix.org/2012/08/22/multi-doc-transactions-for-mongodb
//
package txn
import (
"encoding/binary"
"fmt"
"reflect"
"sort"
"strings"
"sync"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
crand "crypto/rand"
mrand "math/rand"
)
type state int
const (
tpreparing state = 1 // One or more documents not prepared
tprepared state = 2 // Prepared but not yet ready to run
taborting state = 3 // Assertions failed, cleaning up
tapplying state = 4 // Changes are in progress
taborted state = 5 // Pre-conditions failed, nothing done
tapplied state = 6 // All changes applied
)
func (s state) String() string {
switch s {
case tpreparing:
return "preparing"
case tprepared:
return "prepared"
case taborting:
return "aborting"
case tapplying:
return "applying"
case taborted:
return "aborted"
case tapplied:
return "applied"
}
panic(fmt.Errorf("unknown state: %d", s))
}
var rand *mrand.Rand
var randmu sync.Mutex
func init() {
var seed int64
err := binary.Read(crand.Reader, binary.BigEndian, &seed)
if err != nil {
panic(err)
}
rand = mrand.New(mrand.NewSource(seed))
}
type transaction struct {
Id bson.ObjectId `bson:"_id"`
State state `bson:"s"`
Info interface{} `bson:"i,omitempty"`
Ops []Op `bson:"o"`
Nonce string `bson:"n,omitempty"`
Revnos []int64 `bson:"r,omitempty"`
docKeysCached docKeys
}
func (t *transaction) String() string {
if t.Nonce == "" {
return t.Id.Hex()
}
return string(t.token())
}
func (t *transaction) done() bool {
return t.State == tapplied || t.State == taborted
}
func (t *transaction) token() token {
if t.Nonce == "" {
panic("transaction has no nonce")
}
return tokenFor(t)
}
func (t *transaction) docKeys() docKeys {
if t.docKeysCached != nil {
return t.docKeysCached
}
dkeys := make(docKeys, 0, len(t.Ops))
NextOp:
for _, op := range t.Ops {
dkey := op.docKey()
for i := range dkeys {
if dkey == dkeys[i] {
continue NextOp
}
}
dkeys = append(dkeys, dkey)
}
sort.Sort(dkeys)
t.docKeysCached = dkeys
return dkeys
}
// tokenFor returns a unique transaction token that
// is composed by t's id and a nonce. If t already has
// a nonce assigned to it, it will be used, otherwise
// a new nonce will be generated.
func tokenFor(t *transaction) token {
nonce := t.Nonce
if nonce == "" {
nonce = newNonce()
}
return token(t.Id.Hex() + "_" + nonce)
}
func newNonce() string {
randmu.Lock()
r := rand.Uint32()
randmu.Unlock()
n := make([]byte, 8)
for i := uint(0); i < 8; i++ {
n[i] = "0123456789abcdef"[(r>>(4*i))&0xf]
}
return string(n)
}
type token string
func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) }
func (tt token) nonce() string { return string(tt[25:]) }
// Op represents an operation to a single document that may be
// applied as part of a transaction with other operations.
type Op struct {
// C and Id identify the collection and document this operation
// refers to. Id is matched against the "_id" document field.
C string `bson:"c"`
Id interface{} `bson:"d"`
// Assert optionally holds a query document that is used to
// test the operation document at the time the transaction is
// going to be applied. The assertions for all operations in
// a transaction are tested before any changes take place,
// and the transaction is entirely aborted if any of them
// fails. This is also the only way to prevent a transaction
// from being being applied (the transaction continues despite
// the outcome of Insert, Update, and Remove).
Assert interface{} `bson:"a,omitempty"`
// The Insert, Update and Remove fields describe the mutation
// intended by the operation. At most one of them may be set
// per operation. If none are set, Assert must be set and the
// operation becomes a read-only test.
//
// Insert holds the document to be inserted at the time the
// transaction is applied. The Id field will be inserted
// into the document automatically as its _id field. The
// transaction will continue even if the document already
// exists. Use Assert with txn.DocMissing if the insertion is
// required.
//
// Update holds the update document to be applied at the time
// the transaction is applied. The transaction will continue
// even if a document with Id is missing. Use Assert to
// test for the document presence or its contents.
//
// Remove indicates whether to remove the document with Id.
// The transaction continues even if the document doesn't yet
// exist at the time the transaction is applied. Use Assert
// with txn.DocExists to make sure it will be removed.
Insert interface{} `bson:"i,omitempty"`
Update interface{} `bson:"u,omitempty"`
Remove bool `bson:"r,omitempty"`
}
func (op *Op) isChange() bool {
return op.Update != nil || op.Insert != nil || op.Remove
}
func (op *Op) docKey() docKey {
return docKey{op.C, op.Id}
}
func (op *Op) name() string {
switch {
case op.Update != nil:
return "update"
case op.Insert != nil:
return "insert"
case op.Remove:
return "remove"
case op.Assert != nil:
return "assert"
}
return "none"
}
const (
// DocExists and DocMissing may be used on an operation's
// Assert value to assert that the document with the given
// Id exists or does not exist, respectively.
DocExists = "d+"
DocMissing = "d-"
)
// A Runner applies operations as part of a transaction onto any number
// of collections within a database. See the Run method for details.
type Runner struct {
tc *mgo.Collection // txns
sc *mgo.Collection // stash
lc *mgo.Collection // log
}
// NewRunner returns a new transaction runner that uses tc to hold its
// transactions.
//
// Multiple transaction collections may exist in a single database, but
// all collections that are touched by operations in a given transaction
// collection must be handled exclusively by it.
//
// A second collection with the same name of tc but suffixed by ".stash"
// will be used for implementing the transactional behavior of insert
// and remove operations.
func NewRunner(tc *mgo.Collection) *Runner {
return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil}
}
var ErrAborted = fmt.Errorf("transaction aborted")
// Run creates a new transaction with ops and runs it immediately.
// The id parameter specifies the transaction id, and may be written
// down ahead of time to later verify the success of the change and
// resume it, when the procedure is interrupted for any reason. If
// empty, a random id will be generated.
// The info parameter, if not nil, is included under the "i"
// field of the transaction document.
//
// Operations across documents are not atomically applied, but are
// guaranteed to be eventually all applied in the order provided or
// all aborted, as long as the affected documents are only modified
// through transactions. If documents are simultaneously modified
// by transactions and out of transactions the behavior is undefined.
//
// If Run returns no errors, all operations were applied successfully.
// If it returns ErrAborted, one or more operations can't be applied
// and the transaction was entirely aborted with no changes performed.
// Otherwise, if the transaction is interrupted while running for any
// reason, it may be resumed explicitly or by attempting to apply
// another transaction on any of the documents targeted by ops, as
// long as the interruption was made after the transaction document
// itself was inserted. Run Resume with the obtained transaction id
// to confirm whether the transaction was applied or not.
//
// Any number of transactions may be run concurrently, with one
// runner or many.
func (r *Runner) Run(ops []Op, id bson.ObjectId, info interface{}) (err error) {
const efmt = "error in transaction op %d: %s"
for i := range ops {
op := &ops[i]
if op.C == "" || op.Id == nil {
return fmt.Errorf(efmt, i, "C or Id missing")
}
changes := 0
if op.Insert != nil {
changes++
}
if op.Update != nil {
changes++
}
if op.Remove {
changes++
}
if changes > 1 {
return fmt.Errorf(efmt, i, "more than one of Insert/Update/Remove set")
}
if changes == 0 && op.Assert == nil {
return fmt.Errorf(efmt, i, "none of Assert/Insert/Update/Remove set")
}
}
if id == "" {
id = bson.NewObjectId()
}
// Insert transaction sooner rather than later, to stay on the safer side.
t := transaction{
Id: id,
Ops: ops,
State: tpreparing,
Info: info,
}
if err = r.tc.Insert(&t); err != nil {
return err
}
if err = flush(r, &t); err != nil {
return err
}
if t.State == taborted {
return ErrAborted
} else if t.State != tapplied {
panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State))
}
return nil
}
// ResumeAll resumes all pending transactions. All ErrAborted errors
// from individual transactions are ignored.
func (r *Runner) ResumeAll() (err error) {
debugf("Resuming all unfinished transactions")
iter := r.tc.Find(bson.D{{"s", bson.D{{"$in", []state{tpreparing, tprepared, tapplying}}}}}).Iter()
var t transaction
for iter.Next(&t) {
if t.State == tapplied || t.State == taborted {
continue
}
debugf("Resuming %s from %q", t.Id, t.State)
if err := flush(r, &t); err != nil {
return err
}
if !t.done() {
panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State))
}
}
return nil
}
// Resume resumes the transaction with id. It returns mgo.ErrNotFound
// if the transaction is not found. Otherwise, it has the same semantics
// of the Run method after the transaction is inserted.
func (r *Runner) Resume(id bson.ObjectId) (err error) {
t, err := r.load(id)
if err != nil {
return err
}
if !t.done() {
debugf("Resuming %s from %q", t, t.State)
if err := flush(r, t); err != nil {
return err
}
}
if t.State == taborted {
return ErrAborted
} else if t.State != tapplied {
panic(fmt.Errorf("invalid state for %s after flush: %q", t, t.State))
}
return nil
}
// ChangeLog enables logging of changes to the given collection
// every time a transaction that modifies content is done being
// applied.
//
// Saved documents are in the format:
//
// {"_id": <txn id>, <collection>: {"d": [<doc id>, ...], "r": [<doc revno>, ...]}}
//
// The document revision is the value of the txn-revno field after
// the change has been applied. Negative values indicate the document
// was not present in the collection. Revisions will not change when
// updates or removes are applied to missing documents or inserts are
// attempted when the document isn't present.
func (r *Runner) ChangeLog(logc *mgo.Collection) {
r.lc = logc
}
// PurgeMissing removes from collections any state that refers to transaction
// documents that for whatever reason have been lost from the system (removed
// by accident or lost in a hard crash, for example).
//
// This method should very rarely be needed, if at all, and should never be
// used during the normal operation of an application. Its purpose is to put
// a system that has seen unavoidable corruption back in a working state.
func (r *Runner) PurgeMissing(collections ...string) error {
type M map[string]interface{}
type S []interface{}
type TDoc struct {
Id interface{} "_id"
TxnQueue []string "txn-queue"
}
found := make(map[bson.ObjectId]bool)
sort.Strings(collections)
for _, collection := range collections {
c := r.tc.Database.C(collection)
iter := c.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
var tdoc TDoc
for iter.Next(&tdoc) {
for _, txnToken := range tdoc.TxnQueue {
txnId := bson.ObjectIdHex(txnToken[:24])
if found[txnId] {
continue
}
if r.tc.FindId(txnId).One(nil) == nil {
found[txnId] = true
continue
}
logf("WARNING: purging from document %s/%v the missing transaction id %s", collection, tdoc.Id, txnId)
err := c.UpdateId(tdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
if err != nil {
return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
}
}
}
if err := iter.Close(); err != nil {
return fmt.Errorf("transaction queue iteration error for %s: %v", collection, err)
}
}
type StashTDoc struct {
Id docKey "_id"
TxnQueue []string "txn-queue"
}
iter := r.sc.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
var stdoc StashTDoc
for iter.Next(&stdoc) {
for _, txnToken := range stdoc.TxnQueue {
txnId := bson.ObjectIdHex(txnToken[:24])
if found[txnId] {
continue
}
if r.tc.FindId(txnId).One(nil) == nil {
found[txnId] = true
continue
}
logf("WARNING: purging from stash document %s/%v the missing transaction id %s", stdoc.Id.C, stdoc.Id.Id, txnId)
err := r.sc.UpdateId(stdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
if err != nil {
return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
}
}
}
if err := iter.Close(); err != nil {
return fmt.Errorf("transaction stash iteration error: %v", err)
}
return nil
}
func (r *Runner) load(id bson.ObjectId) (*transaction, error) {
var t transaction
err := r.tc.FindId(id).One(&t)
if err == mgo.ErrNotFound {
return nil, fmt.Errorf("cannot find transaction %s", id)
} else if err != nil {
return nil, err
}
return &t, nil
}
type typeNature int
const (
// The order of these values matters. Transactions
// from applications using different ordering will
// be incompatible with each other.
_ typeNature = iota
natureString
natureInt
natureFloat
natureBool
natureStruct
)
func valueNature(v interface{}) (value interface{}, nature typeNature) {
rv := reflect.ValueOf(v)
switch rv.Kind() {
case reflect.String:
return rv.String(), natureString
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return rv.Int(), natureInt
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return int64(rv.Uint()), natureInt
case reflect.Float32, reflect.Float64:
return rv.Float(), natureFloat
case reflect.Bool:
return rv.Bool(), natureBool
case reflect.Struct:
return v, natureStruct
}
panic("document id type unsupported by txn: " + rv.Kind().String())
}
type docKey struct {
C string
Id interface{}
}
type docKeys []docKey
func (ks docKeys) Len() int { return len(ks) }
func (ks docKeys) Swap(i, j int) { ks[i], ks[j] = ks[j], ks[i] }
func (ks docKeys) Less(i, j int) bool {
a, b := ks[i], ks[j]
if a.C != b.C {
return a.C < b.C
}
return valuecmp(a.Id, b.Id) == -1
}
func valuecmp(a, b interface{}) int {
av, an := valueNature(a)
bv, bn := valueNature(b)
if an < bn {
return -1
}
if an > bn {
return 1
}
if av == bv {
return 0
}
var less bool
switch an {
case natureString:
less = av.(string) < bv.(string)
case natureInt:
less = av.(int64) < bv.(int64)
case natureFloat:
less = av.(float64) < bv.(float64)
case natureBool:
less = !av.(bool) && bv.(bool)
case natureStruct:
less = structcmp(av, bv) == -1
default:
panic("unreachable")
}
if less {
return -1
}
return 1
}
func structcmp(a, b interface{}) int {
av := reflect.ValueOf(a)
bv := reflect.ValueOf(b)
var ai, bi = 0, 0
var an, bn = av.NumField(), bv.NumField()
var avi, bvi interface{}
var af, bf reflect.StructField
for {
for ai < an {
af = av.Type().Field(ai)
if isExported(af.Name) {
avi = av.Field(ai).Interface()
ai++
break
}
ai++
}
for bi < bn {
bf = bv.Type().Field(bi)
if isExported(bf.Name) {
bvi = bv.Field(bi).Interface()
bi++
break
}
bi++
}
if n := valuecmp(avi, bvi); n != 0 {
return n
}
nameA := getFieldName(af)
nameB := getFieldName(bf)
if nameA < nameB {
return -1
}
if nameA > nameB {
return 1
}
if ai == an && bi == bn {
return 0
}
if ai == an || bi == bn {
if ai == bn {
return -1
}
return 1
}
}
panic("unreachable")
}
func isExported(name string) bool {
a := name[0]
return a >= 'A' && a <= 'Z'
}
func getFieldName(f reflect.StructField) string {
name := f.Tag.Get("bson")
if i := strings.Index(name, ","); i >= 0 {
name = name[:i]
}
if name == "" {
name = strings.ToLower(f.Name)
}
return name
}
+778
View File
@@ -0,0 +1,778 @@
package txn_test
import (
"flag"
"fmt"
"sync"
"testing"
"time"
. "gopkg.in/check.v1"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2/dbtest"
"gopkg.in/mgo.v2/txn"
)
func TestAll(t *testing.T) {
TestingT(t)
}
type S struct {
server dbtest.DBServer
session *mgo.Session
db *mgo.Database
tc, sc *mgo.Collection
accounts *mgo.Collection
runner *txn.Runner
}
var _ = Suite(&S{})
type M map[string]interface{}
func (s *S) SetUpSuite(c *C) {
s.server.SetPath(c.MkDir())
}
func (s *S) TearDownSuite(c *C) {
s.server.Stop()
}
func (s *S) SetUpTest(c *C) {
s.server.Wipe()
txn.SetChaos(txn.Chaos{})
txn.SetLogger(c)
txn.SetDebug(true)
s.session = s.server.Session()
s.db = s.session.DB("test")
s.tc = s.db.C("tc")
s.sc = s.db.C("tc.stash")
s.accounts = s.db.C("accounts")
s.runner = txn.NewRunner(s.tc)
}
func (s *S) TearDownTest(c *C) {
txn.SetLogger(nil)
txn.SetDebug(false)
s.session.Close()
}
type Account struct {
Id int `bson:"_id"`
Balance int
}
func (s *S) TestDocExists(c *C) {
err := s.accounts.Insert(M{"_id": 0, "balance": 300})
c.Assert(err, IsNil)
exists := []txn.Op{{
C: "accounts",
Id: 0,
Assert: txn.DocExists,
}}
missing := []txn.Op{{
C: "accounts",
Id: 0,
Assert: txn.DocMissing,
}}
err = s.runner.Run(exists, "", nil)
c.Assert(err, IsNil)
err = s.runner.Run(missing, "", nil)
c.Assert(err, Equals, txn.ErrAborted)
err = s.accounts.RemoveId(0)
c.Assert(err, IsNil)
err = s.runner.Run(exists, "", nil)
c.Assert(err, Equals, txn.ErrAborted)
err = s.runner.Run(missing, "", nil)
c.Assert(err, IsNil)
}
func (s *S) TestInsert(c *C) {
err := s.accounts.Insert(M{"_id": 0, "balance": 300})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Insert: M{"balance": 200},
}}
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
var account Account
err = s.accounts.FindId(0).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 300)
ops[0].Id = 1
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
err = s.accounts.FindId(1).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 200)
}
func (s *S) TestInsertStructID(c *C) {
type id struct {
FirstName string
LastName string
}
ops := []txn.Op{{
C: "accounts",
Id: id{FirstName: "John", LastName: "Jones"},
Assert: txn.DocMissing,
Insert: M{"balance": 200},
}, {
C: "accounts",
Id: id{FirstName: "Sally", LastName: "Smith"},
Assert: txn.DocMissing,
Insert: M{"balance": 800},
}}
err := s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
n, err := s.accounts.Find(nil).Count()
c.Assert(err, IsNil)
c.Assert(n, Equals, 2)
}
func (s *S) TestRemove(c *C) {
err := s.accounts.Insert(M{"_id": 0, "balance": 300})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Remove: true,
}}
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
err = s.accounts.FindId(0).One(nil)
c.Assert(err, Equals, mgo.ErrNotFound)
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
}
func (s *S) TestUpdate(c *C) {
var err error
err = s.accounts.Insert(M{"_id": 0, "balance": 200})
c.Assert(err, IsNil)
err = s.accounts.Insert(M{"_id": 1, "balance": 200})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
var account Account
err = s.accounts.FindId(0).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 300)
ops[0].Id = 1
err = s.accounts.FindId(1).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 200)
}
func (s *S) TestInsertUpdate(c *C) {
ops := []txn.Op{{
C: "accounts",
Id: 0,
Insert: M{"_id": 0, "balance": 200},
}, {
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
err := s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
var account Account
err = s.accounts.FindId(0).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 300)
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
err = s.accounts.FindId(0).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 400)
}
func (s *S) TestUpdateInsert(c *C) {
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}, {
C: "accounts",
Id: 0,
Insert: M{"_id": 0, "balance": 200},
}}
err := s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
var account Account
err = s.accounts.FindId(0).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 200)
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
err = s.accounts.FindId(0).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 300)
}
func (s *S) TestInsertRemoveInsert(c *C) {
ops := []txn.Op{{
C: "accounts",
Id: 0,
Insert: M{"_id": 0, "balance": 200},
}, {
C: "accounts",
Id: 0,
Remove: true,
}, {
C: "accounts",
Id: 0,
Insert: M{"_id": 0, "balance": 300},
}}
err := s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
var account Account
err = s.accounts.FindId(0).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 300)
}
func (s *S) TestQueueStashing(c *C) {
txn.SetChaos(txn.Chaos{
KillChance: 1,
Breakpoint: "set-applying",
})
opses := [][]txn.Op{{{
C: "accounts",
Id: 0,
Insert: M{"balance": 100},
}}, {{
C: "accounts",
Id: 0,
Remove: true,
}}, {{
C: "accounts",
Id: 0,
Insert: M{"balance": 200},
}}, {{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}}
var last bson.ObjectId
for _, ops := range opses {
last = bson.NewObjectId()
err := s.runner.Run(ops, last, nil)
c.Assert(err, Equals, txn.ErrChaos)
}
txn.SetChaos(txn.Chaos{})
err := s.runner.Resume(last)
c.Assert(err, IsNil)
var account Account
err = s.accounts.FindId(0).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 300)
}
func (s *S) TestInfo(c *C) {
ops := []txn.Op{{
C: "accounts",
Id: 0,
Assert: txn.DocMissing,
}}
id := bson.NewObjectId()
err := s.runner.Run(ops, id, M{"n": 42})
c.Assert(err, IsNil)
var t struct{ I struct{ N int } }
err = s.tc.FindId(id).One(&t)
c.Assert(err, IsNil)
c.Assert(t.I.N, Equals, 42)
}
func (s *S) TestErrors(c *C) {
doc := bson.M{"foo": 1}
tests := []txn.Op{{
C: "c",
Id: 0,
}, {
C: "c",
Id: 0,
Insert: doc,
Remove: true,
}, {
C: "c",
Id: 0,
Insert: doc,
Update: doc,
}, {
C: "c",
Id: 0,
Update: doc,
Remove: true,
}, {
C: "c",
Assert: doc,
}, {
Id: 0,
Assert: doc,
}}
txn.SetChaos(txn.Chaos{KillChance: 1.0})
for _, op := range tests {
c.Logf("op: %v", op)
err := s.runner.Run([]txn.Op{op}, "", nil)
c.Assert(err, ErrorMatches, "error in transaction op 0: .*")
}
}
func (s *S) TestAssertNestedOr(c *C) {
// Assert uses $or internally. Ensure nesting works.
err := s.accounts.Insert(M{"_id": 0, "balance": 300})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Assert: bson.D{{"$or", []bson.D{{{"balance", 100}}, {{"balance", 300}}}}},
Update: bson.D{{"$inc", bson.D{{"balance", 100}}}},
}}
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
var account Account
err = s.accounts.FindId(0).One(&account)
c.Assert(err, IsNil)
c.Assert(account.Balance, Equals, 400)
}
func (s *S) TestVerifyFieldOrdering(c *C) {
// Used to have a map in certain operations, which means
// the ordering of fields would be messed up.
fields := bson.D{{"a", 1}, {"b", 2}, {"c", 3}}
ops := []txn.Op{{
C: "accounts",
Id: 0,
Insert: fields,
}}
err := s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
var d bson.D
err = s.accounts.FindId(0).One(&d)
c.Assert(err, IsNil)
var filtered bson.D
for _, e := range d {
switch e.Name {
case "a", "b", "c":
filtered = append(filtered, e)
}
}
c.Assert(filtered, DeepEquals, fields)
}
func (s *S) TestChangeLog(c *C) {
chglog := s.db.C("chglog")
s.runner.ChangeLog(chglog)
ops := []txn.Op{{
C: "debts",
Id: 0,
Assert: txn.DocMissing,
}, {
C: "accounts",
Id: 0,
Insert: M{"balance": 300},
}, {
C: "accounts",
Id: 1,
Insert: M{"balance": 300},
}, {
C: "people",
Id: "joe",
Insert: M{"accounts": []int64{0, 1}},
}}
id := bson.NewObjectId()
err := s.runner.Run(ops, id, nil)
c.Assert(err, IsNil)
type IdList []interface{}
type Log struct {
Docs IdList "d"
Revnos []int64 "r"
}
var m map[string]*Log
err = chglog.FindId(id).One(&m)
c.Assert(err, IsNil)
c.Assert(m["accounts"], DeepEquals, &Log{IdList{0, 1}, []int64{2, 2}})
c.Assert(m["people"], DeepEquals, &Log{IdList{"joe"}, []int64{2}})
c.Assert(m["debts"], IsNil)
ops = []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}, {
C: "accounts",
Id: 1,
Update: M{"$inc": M{"balance": 100}},
}}
id = bson.NewObjectId()
err = s.runner.Run(ops, id, nil)
c.Assert(err, IsNil)
m = nil
err = chglog.FindId(id).One(&m)
c.Assert(err, IsNil)
c.Assert(m["accounts"], DeepEquals, &Log{IdList{0, 1}, []int64{3, 3}})
c.Assert(m["people"], IsNil)
ops = []txn.Op{{
C: "accounts",
Id: 0,
Remove: true,
}, {
C: "people",
Id: "joe",
Remove: true,
}}
id = bson.NewObjectId()
err = s.runner.Run(ops, id, nil)
c.Assert(err, IsNil)
m = nil
err = chglog.FindId(id).One(&m)
c.Assert(err, IsNil)
c.Assert(m["accounts"], DeepEquals, &Log{IdList{0}, []int64{-4}})
c.Assert(m["people"], DeepEquals, &Log{IdList{"joe"}, []int64{-3}})
}
func (s *S) TestPurgeMissing(c *C) {
txn.SetChaos(txn.Chaos{
KillChance: 1,
Breakpoint: "set-applying",
})
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)
err = s.accounts.Insert(M{"_id": 1, "balance": 100})
c.Assert(err, IsNil)
ops1 := []txn.Op{{
C: "accounts",
Id: 3,
Insert: M{"balance": 100},
}}
ops2 := []txn.Op{{
C: "accounts",
Id: 0,
Remove: true,
}, {
C: "accounts",
Id: 1,
Update: M{"$inc": M{"balance": 100}},
}, {
C: "accounts",
Id: 2,
Insert: M{"balance": 100},
}}
first := bson.NewObjectId()
c.Logf("---- Running ops1 under transaction %q, to be canceled by chaos", first.Hex())
err = s.runner.Run(ops1, first, nil)
c.Assert(err, Equals, txn.ErrChaos)
last := bson.NewObjectId()
c.Logf("---- Running ops2 under transaction %q, to be canceled by chaos", last.Hex())
err = s.runner.Run(ops2, last, nil)
c.Assert(err, Equals, txn.ErrChaos)
c.Logf("---- Removing transaction %q", last.Hex())
err = s.tc.RemoveId(last)
c.Assert(err, IsNil)
c.Logf("---- Disabling chaos and attempting to resume all")
txn.SetChaos(txn.Chaos{})
err = s.runner.ResumeAll()
c.Assert(err, IsNil)
again := bson.NewObjectId()
c.Logf("---- Running ops2 again under transaction %q, to fail for missing transaction", again.Hex())
err = s.runner.Run(ops2, again, nil)
c.Assert(err, ErrorMatches, "cannot find transaction .*")
c.Logf("---- Purging missing transactions")
err = s.runner.PurgeMissing("accounts")
c.Assert(err, IsNil)
c.Logf("---- Resuming pending transactions")
err = s.runner.ResumeAll()
c.Assert(err, IsNil)
expect := []struct{ Id, Balance int }{
{0, -1},
{1, 200},
{2, 100},
{3, 100},
}
var got Account
for _, want := range expect {
err = s.accounts.FindId(want.Id).One(&got)
if want.Balance == -1 {
if err != mgo.ErrNotFound {
c.Errorf("Account %d should not exist, find got err=%#v", err)
}
} else if err != nil {
c.Errorf("Account %d should have balance of %d, but wasn't found", want.Id, want.Balance)
} else if got.Balance != want.Balance {
c.Errorf("Account %d should have balance of %d, got %d", want.Id, want.Balance, got.Balance)
}
}
}
func (s *S) TestTxnQueueStashStressTest(c *C) {
txn.SetChaos(txn.Chaos{
SlowdownChance: 0.3,
Slowdown: 50 * time.Millisecond,
})
defer txn.SetChaos(txn.Chaos{})
// So we can run more iterations of the test in less time.
txn.SetDebug(false)
const runners = 10
const inserts = 10
const repeat = 100
for r := 0; r < repeat; r++ {
var wg sync.WaitGroup
wg.Add(runners)
for i := 0; i < runners; i++ {
go func(i, r int) {
defer wg.Done()
session := s.session.New()
defer session.Close()
runner := txn.NewRunner(s.tc.With(session))
for j := 0; j < inserts; j++ {
ops := []txn.Op{{
C: "accounts",
Id: fmt.Sprintf("insert-%d-%d", r, j),
Insert: bson.M{
"added-by": i,
},
}}
err := runner.Run(ops, "", nil)
if err != txn.ErrAborted {
c.Check(err, IsNil)
}
}
}(i, r)
}
wg.Wait()
}
}
func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
// This test ensures that PurgeMissing can handle very large
// txn-queue fields. Previous iterations of PurgeMissing would
// trigger a 16MB aggregation pipeline result size limit when run
// against a documents or stashes with large numbers of txn-queue
// entries. PurgeMissing now no longer uses aggregation pipelines
// to work around this limit.
// The pipeline result size limitation was removed from MongoDB in 2.6 so
// this test is only run for older MongoDB version.
build, err := s.session.BuildInfo()
c.Assert(err, IsNil)
if build.VersionAtLeast(2, 6) {
c.Skip("This tests a problem that can only happen with MongoDB < 2.6 ")
}
// Insert a single document to work with.
err = s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
// Generate one successful transaction.
good := bson.NewObjectId()
c.Logf("---- Running ops under transaction %q", good.Hex())
err = s.runner.Run(ops, good, nil)
c.Assert(err, IsNil)
// Generate another transaction which which will go missing.
missing := bson.NewObjectId()
c.Logf("---- Running ops under transaction %q (which will go missing)", missing.Hex())
err = s.runner.Run(ops, missing, nil)
c.Assert(err, IsNil)
err = s.tc.RemoveId(missing)
c.Assert(err, IsNil)
// Generate a txn-queue on the test document that's large enough
// that it used to cause PurgeMissing to exceed MongoDB's pipeline
// result 16MB size limit (MongoDB 2.4 and older only).
//
// The contents of the txn-queue field doesn't matter, only that
// it's big enough to trigger the size limit. The required size
// can also be achieved by using multiple documents as long as the
// cumulative size of all the txn-queue fields exceeds the
// pipeline limit. A single document is easier to work with for
// this test however.
//
// The txn id of the successful transaction is used fill the
// txn-queue because this takes advantage of a short circuit in
// PurgeMissing, dramatically speeding up the test run time.
const fakeQueueLen = 250000
fakeTxnQueue := make([]string, fakeQueueLen)
token := good.Hex() + "_12345678" // txn id + nonce
for i := 0; i < fakeQueueLen; i++ {
fakeTxnQueue[i] = token
}
err = s.accounts.UpdateId(0, bson.M{
"$set": bson.M{"txn-queue": fakeTxnQueue},
})
c.Assert(err, IsNil)
// PurgeMissing could hit the same pipeline result size limit when
// processing the txn-queue fields of stash documents so insert
// the large txn-queue there too to ensure that no longer happens.
err = s.sc.Insert(
bson.D{{"c", "accounts"}, {"id", 0}},
bson.M{"txn-queue": fakeTxnQueue},
)
c.Assert(err, IsNil)
c.Logf("---- Purging missing transactions")
err = s.runner.PurgeMissing("accounts")
c.Assert(err, IsNil)
}
var flaky = flag.Bool("flaky", false, "Include flaky tests")
func (s *S) TestTxnQueueStressTest(c *C) {
// This fails about 20% of the time on Mongo 3.2 (I haven't tried
// other versions) with account balance being 3999 instead of
// 4000. That implies that some updates are being lost. This is
// bad and we'll need to chase it down in the near future - the
// only reason it's being skipped now is that it's already failing
// and it's better to have the txn tests running without this one
// than to have them not running at all.
if !*flaky {
c.Skip("Fails intermittently - disabling until fixed")
}
txn.SetChaos(txn.Chaos{
SlowdownChance: 0.3,
Slowdown: 50 * time.Millisecond,
})
defer txn.SetChaos(txn.Chaos{})
// So we can run more iterations of the test in less time.
txn.SetDebug(false)
err := s.accounts.Insert(M{"_id": 0, "balance": 0}, M{"_id": 1, "balance": 0})
c.Assert(err, IsNil)
// Run half of the operations changing account 0 and then 1,
// and the other half in the opposite order.
ops01 := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 1}},
}, {
C: "accounts",
Id: 1,
Update: M{"$inc": M{"balance": 1}},
}}
ops10 := []txn.Op{{
C: "accounts",
Id: 1,
Update: M{"$inc": M{"balance": 1}},
}, {
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 1}},
}}
ops := [][]txn.Op{ops01, ops10}
const runners = 4
const changes = 1000
var wg sync.WaitGroup
wg.Add(runners)
for n := 0; n < runners; n++ {
n := n
go func() {
defer wg.Done()
for i := 0; i < changes; i++ {
err = s.runner.Run(ops[n%2], "", nil)
c.Assert(err, IsNil)
}
}()
}
wg.Wait()
for id := 0; id < 2; id++ {
var account Account
err = s.accounts.FindId(id).One(&account)
if account.Balance != runners*changes {
c.Errorf("Account should have balance of %d, got %d", runners*changes, account.Balance)
}
}
}