Fix parallelly generating index failure with Mysql (#24567) (#25081)

Backport #24567 by @lunny

- Fix possible parallel creating commit status index problem and
creating issues/pull request index problem work with Mysql5/Mysql8
- Add parallel tests
- Reenable TestRepoCommitsStatusParallel on CI

Fix #22109

---------

Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
This commit is contained in:
Giteabot 2023-06-05 11:05:52 -04:00 committed by GitHub
parent 15adec19e7
commit 3ef97932d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 7 deletions

View File

@ -71,10 +71,31 @@ func postgresGetNextResourceIndex(ctx context.Context, tableName string, groupID
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64) return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
} }
func mysqlGetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
if _, err := GetEngine(ctx).Exec(fmt.Sprintf("INSERT INTO %s (group_id, max_index) "+
"VALUES (?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
tableName), groupID); err != nil {
return 0, err
}
var idx int64
_, err := GetEngine(ctx).SQL(fmt.Sprintf("SELECT max_index FROM %s WHERE group_id = ?", tableName), groupID).Get(&idx)
if err != nil {
return 0, err
}
if idx == 0 {
return 0, errors.New("cannot get the correct index")
}
return idx, nil
}
// GetNextResourceIndex generates a resource index, it must run in the same transaction where the resource is created // GetNextResourceIndex generates a resource index, it must run in the same transaction where the resource is created
func GetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) { func GetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
if setting.Database.Type.IsPostgreSQL() { switch {
case setting.Database.Type.IsPostgreSQL():
return postgresGetNextResourceIndex(ctx, tableName, groupID) return postgresGetNextResourceIndex(ctx, tableName, groupID)
case setting.Database.Type.IsMySQL():
return mysqlGetNextResourceIndex(ctx, tableName, groupID)
} }
e := GetEngine(ctx) e := GetEngine(ctx)

View File

@ -63,10 +63,32 @@ func postgresGetCommitStatusIndex(ctx context.Context, repoID int64, sha string)
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64) return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
} }
func mysqlGetCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
if _, err := db.GetEngine(ctx).Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
"VALUES (?,?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
repoID, sha); err != nil {
return 0, err
}
var idx int64
_, err := db.GetEngine(ctx).SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ?",
repoID, sha).Get(&idx)
if err != nil {
return 0, err
}
if idx == 0 {
return 0, errors.New("cannot get the correct index")
}
return idx, nil
}
// GetNextCommitStatusIndex retried 3 times to generate a resource index // GetNextCommitStatusIndex retried 3 times to generate a resource index
func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) { func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
if setting.Database.Type.IsPostgreSQL() { switch {
case setting.Database.Type.IsPostgreSQL():
return postgresGetCommitStatusIndex(ctx, repoID, sha) return postgresGetCommitStatusIndex(ctx, repoID, sha)
case setting.Database.Type.IsMySQL():
return mysqlGetCommitStatusIndex(ctx, repoID, sha)
} }
e := db.GetEngine(ctx) e := db.GetEngine(ctx)
@ -74,7 +96,7 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in
// try to update the max_index to next value, and acquire the write-lock for the record // try to update the max_index to next value, and acquire the write-lock for the record
res, err := e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha) res, err := e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("update failed: %w", err)
} }
affected, err := res.RowsAffected() affected, err := res.RowsAffected()
if err != nil { if err != nil {
@ -85,18 +107,18 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in
_, errIns := e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha) _, errIns := e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha)
res, err = e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha) res, err = e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("update2 failed: %w", err)
} }
affected, err = res.RowsAffected() affected, err = res.RowsAffected()
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("RowsAffected failed: %w", err)
} }
// if the update still can not update any records, the record must not exist and there must be some errors (insert error) // if the update still can not update any records, the record must not exist and there must be some errors (insert error)
if affected == 0 { if affected == 0 {
if errIns == nil { if errIns == nil {
return 0, errors.New("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated") return 0, errors.New("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated")
} }
return 0, errIns return 0, fmt.Errorf("insert failed: %w", errIns)
} }
} }
@ -104,7 +126,7 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in
var newIdx int64 var newIdx int64
has, err := e.SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?", repoID, sha).Get(&newIdx) has, err := e.SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?", repoID, sha).Get(&newIdx)
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("select failed: %w", err)
} }
if !has { if !has {
return 0, errors.New("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected") return 0, errors.New("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected")

View File

@ -7,6 +7,8 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"sync"
"testing" "testing"
"time" "time"
@ -106,6 +108,49 @@ func TestAPICreateIssue(t *testing.T) {
assert.Equal(t, repoBefore.NumClosedIssues, repoAfter.NumClosedIssues) assert.Equal(t, repoBefore.NumClosedIssues, repoAfter.NumClosedIssues)
} }
func TestAPICreateIssueParallel(t *testing.T) {
defer tests.PrepareTestEnv(t)()
const body, title = "apiTestBody", "apiTestTitle"
repoBefore := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
owner := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: repoBefore.OwnerID})
session := loginUser(t, owner.Name)
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeRepo)
urlStr := fmt.Sprintf("/api/v1/repos/%s/%s/issues?state=all&token=%s", owner.Name, repoBefore.Name, token)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(parentT *testing.T, i int) {
parentT.Run(fmt.Sprintf("ParallelCreateIssue_%d", i), func(t *testing.T) {
newTitle := title + strconv.Itoa(i)
newBody := body + strconv.Itoa(i)
req := NewRequestWithJSON(t, "POST", urlStr, &api.CreateIssueOption{
Body: newBody,
Title: newTitle,
Assignee: owner.Name,
})
resp := MakeRequest(t, req, http.StatusCreated)
var apiIssue api.Issue
DecodeJSON(t, resp, &apiIssue)
assert.Equal(t, newBody, apiIssue.Body)
assert.Equal(t, newTitle, apiIssue.Title)
unittest.AssertExistsAndLoadBean(t, &issues_model.Issue{
RepoID: repoBefore.ID,
AssigneeID: owner.ID,
Content: newBody,
Title: newTitle,
})
wg.Done()
})
}(t, i)
}
wg.Wait()
}
func TestAPIEditIssue(t *testing.T) { func TestAPIEditIssue(t *testing.T) {
defer tests.PrepareTestEnv(t)() defer tests.PrepareTestEnv(t)()