Access Mysql in a Serializable Way
go ·Goroutine is a magic which allows us to run our logic in a single routine. When it comes to the database, the single routine allow us make access serializable. There is no more lock on the database, which would significantly ramp up the performance, although it may require us write some extra codes.
Examples
The whole logic is that we run the mysql client in a sub goroutine and use channel to pass in and out parameters. Here is an example we need to plus 1 in a field in mysql. Table counter only has three fields: (id, name, count). For a plus 1 operation, we need to read the count from mysql, then update its new value into mysql. This code is simple,
func (c *MysqlClient) doSomeSql(name string) (int, error) {
row := c.db.QueryRow("SELECT count FROM counters WHERE name = ?", name)
var count int
if err := row.Scan(&count); err != nil {
return 0, err
}
if _, err := c.db.Exec("UPDATE counters SET count = ? WHERE name = ?",
count+1, name); err != nil {
return 0, err
}
return count + 1, nil
}
Then we focus on the Mysql client, it needs a handler of connection to the database and a parameter channel as an entry to accept the request.
type Add1Param struct {
Name string
Count chan int
}
type MysqlClient struct {
db *sql.DB
add1Stream chan Add1Param
}
Then, we kick off the mysql goroutine,
func (c *MysqlClient) Run(ctx context.Context) (err error) {
cfg := mysql.Config{
User: getenv("DBUSER", ""),
Passwd: getenv("DBPASS", ""),
Net: "tcp",
Addr: fmt.Sprintf("%s:%s", getenv("DBHOST", "127.0.0.1"), getenv("DBPORT", "3306")),
DBName: "demo-brokers",
AllowNativePasswords: true,
ParseTime: true,
}
c.db, err = sql.Open("mysql", cfg.FormatDSN())
if err != nil {
return err
}
pingErr := c.db.Ping()
if pingErr != nil {
return pingErr
}
fmt.Println("Mysql Connected...")
go func() {
defer c.exit()
for {
select {
case param := <-c.add1Stream:
count, err := c.doSomeSql(param.Name)
if err != nil {
log.Fatalln(err)
}
param.Count <- count
case <-ctx.Done():
return
}
}
}()
return
}
It also needs some tricks to call mysql from another goroutine.
func (c *MysqlClient) Add1(name string) int {
param := Add1Param{
Name: name,
Count: make(chan int),
}
defer close(param.Count)
c.add1Stream <- param
return <-param.Count
}
Finally, we need to validate whether it will remain data integrity under a concurrent environment.
func TestMysqlClient_Run(t *testing.T) {
client := NewMysqlClient()
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
client.Run(ctx)
const COUNTER_NAME = "reading"
client.DeleteByName(COUNTER_NAME)
client.AddCounter(COUNTER_NAME)
var wg sync.WaitGroup
const N = 10000
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
client.Add1(COUNTER_NAME)
}()
}
wg.Wait()
count := client.QueryByName(COUNTER_NAME).Count
assert.Equal(t, count, N)
}