golang流媒体网站
golang流媒体网站
架构介绍
API
实体类定义
包含请求封装,返回封装,登录信息等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package defs
//reqeusts
type UserCredential struct {
Username string `json:"user_name"`
Pwd string `json:"pwd"`
}
//response
type SignedUp struct {
Success bool `json:"success"`
SessionId string `json:"session_id"`
}
// Data model
//视频信息
type VideoInfo struct {
Id string
AuthorId int
Name string
DisplayCtime string
}
//评论
type Comment struct {
Id string
VideoId string
Author string
Content string
}
type SimpleSession struct {
Username string //login name
TTL int64
}
错误处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package defs
type Err struct {
Error string `json:"error"`
ErrorCode string `json:"error_code"`
}
type ErrResponse struct {
HttpSC int
Error Err
}
var (
ErrorRequestBodyParseFailed = ErrResponse{HttpSC: 400, Error: Err{Error: "Request body is not correct", ErrorCode: "001"}}
ErrorNotAuthUser = ErrResponse{HttpSC: 401, Error: Err{Error: "User authentication failed.", ErrorCode: "002"}}
ErrorDBError = ErrResponse{HttpSC: 500, Error: Err{Error: "DB ops failed", ErrorCode: "003"}}
ErrorInternalFaults = ErrResponse{HttpSC: 500, Error: Err{Error: "Internal service error", ErrorCode: "004"}}
)
返回封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main
import (
"io"
"encoding/json"
"net/http"
"github.com/avenssi/video_server/api/defs"
)
func sendErrorResponse(w http.ResponseWriter, errResp defs.ErrResponse) {
w.WriteHeader(errResp.HttpSC)
resStr, _ := json.Marshal(&errResp.Error)
io.WriteString(w, string(resStr))
}
func sendNormalResponse(w http.ResponseWriter, resp string, sc int) {
w.WriteHeader(sc)
io.WriteString(w, resp)
}
数据库连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package dbops
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
var (
dbConn *sql.DB
err error
)
func init() {
dbConn, err = sql.Open("mysql", "root:123!@#@tcp(localhost:3306)/video_server?charset=utf8")
if err != nil {
panic(err.Error())
工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package utils
import (
"crypto/rand"
"io"
"fmt"
)
func NewUUID() (string, error) {
uuid := make([]byte, 16)
n, err := io.ReadFull(rand.Reader, uuid)
if n != len(uuid) || err != nil {
return "", err
}
// variant bits; see section 4.1.1
uuid[8] = uuid[8]&^0xc0 | 0x80
// version 4 (pseudo-random); see section 4.1.3
uuid[6] = uuid[6]&^0xf0 | 0x40
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
}
用户操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package dbops
import (
"time"
"log"
"database/sql"
_ "github.com/go-sql-driver/mysql"
"github.com/avenssi/video_server/api/defs"
"github.com/avenssi/video_server/api/utils"
)
//添加用户
func AddUserCredential(loginName string, pwd string) error {
stmtIns, err := dbConn.Prepare("INSERT INTO users (login_name, pwd) VALUES (?, ?)")
if err != nil {
return err
}
_, err = stmtIns.Exec(loginName, pwd)
if err != nil {
return err
}
defer stmtIns.Close()
return nil
}
// 查询用户
func GetUserCredential(loginName string) (string, error) {
stmtOut, err := dbConn.Prepare("SELECT pwd FROM users WHERE login_name = ?")
if err != nil {
log.Printf("%s", err)
return "", err
}
var pwd string
err = stmtOut.QueryRow(loginName).Scan(&pwd)
if err != nil && err != sql.ErrNoRows {
return "", err
}
defer stmtOut.Close()
return pwd, nil
}
// 删除用户
func DeleteUser(loginName string, pwd string) error {
stmtDel, err := dbConn.Prepare("DELETE FROm users WHERE login_name=? AND pwd=?")
if err != nil {
log.Printf("DeleteUser error: %s", err)
return err
}
_, err = stmtDel.Exec(loginName, pwd)
if err != nil {
return err
}
defer stmtDel.Close()
return nil
}
func AddNewVideo(aid int, name string) (*defs.VideoInfo, error) {
// create uuid
vid, err := utils.NewUUID()
if err != nil {
return nil, err
}
t := time.Now()
ctime := t.Format("Jan 02 2006, 15:04:05")
stmtIns, err := dbConn.Prepare(`INSERT INTO video_info
(id, author_id, name, display_ctime) VALUES(?, ?, ?, ?)`)
if err != nil {
return nil, err
}
_, err = stmtIns.Exec(vid, aid, name, ctime)
if err != nil {
return nil, err
}
res := &defs.VideoInfo{Id: vid, AuthorId: aid, Name: name, DisplayCtime: ctime}
defer stmtIns.Close()
return res, nil
}
func GetVideoInfo(vid string) (*defs.VideoInfo, error) {
stmtOut, err := dbConn.Prepare("SELECT author_id, name, display_ctime FROM video_info WHERE id=?")
var aid int
var dct string
var name string
err = stmtOut.QueryRow(vid).Scan(&aid, &name, &dct)
if err != nil && err != sql.ErrNoRows{
return nil, err
}
if err == sql.ErrNoRows {
return nil, nil
}
defer stmtOut.Close()
res := &defs.VideoInfo{Id: vid, AuthorId: aid, Name: name, DisplayCtime: dct}
return res, nil
}
func DeleteVideoInfo(vid string) error {
stmtDel, err := dbConn.Prepare("DELETE FROM video_info WHERE id=?")
if err != nil {
return err
}
_, err = stmtDel.Exec(vid)
if err != nil {
return err
}
defer stmtDel.Close()
return nil
}
func AddNewComments(vid string, aid int, content string) error {
id, err := utils.NewUUID()
if err != nil {
return err
}
stmtIns, err := dbConn.Prepare("INSERT INTO comments (id, video_id, author_id, content) values (?, ?, ?, ?)")
if err != nil {
return err
}
_, err = stmtIns.Exec(id, vid, aid, content)
if err != nil {
return err
}
defer stmtIns.Close()
return nil
}
func ListComments(vid string, from, to int) ([]*defs.Comment, error) {
stmtOut, err := dbConn.Prepare(` SELECT comments.id, users.Login_name, comments.content FROM comments
INNER JOIN users ON comments.author_id = users.id
WHERE comments.video_id = ? AND comments.time > FROM_UNIXTIME(?) AND comments.time <= FROM_UNIXTIME(?)`)
var res []*defs.Comment
rows, err := stmtOut.Query(vid, from, to)
if err != nil {
return res, err
}
for rows.Next() {
var id, name, content string
if err := rows.Scan(&id, &name, &content); err != nil {
return res, err
}
c := &defs.Comment{Id: id, VideoId: vid, Author: name, Content: content}
res = append(res, c)
}
defer stmtOut.Close()
return res, nil
}
session处理
与数据库的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package dbops
import (
"strconv"
"sync"
"log"
"database/sql"
"github.com/avenssi/video_server/api/defs"
)
func InsertSession(sid string, ttl int64, uname string) error {
ttlstr := strconv.FormatInt(ttl, 10)
stmtIns, err := dbConn.Prepare("INSERT INTO sessions (session_id, TTL, login_name) VALUES (?, ?, ?)")
if err != nil {
return err
}
_, err = stmtIns.Exec(sid, ttlstr, uname)
if err != nil {
return err
}
defer stmtIns.Close()
return nil
}
func RetrieveSession(sid string) (*defs.SimpleSession, error) {
ss := &defs.SimpleSession{}
stmtOut, err := dbConn.Prepare("SELECT TTL, login_name FROM sessions WHERE session_id=?")
if err != nil {
return nil, err
}
var ttl string
var uname string
stmtOut.QueryRow(sid).Scan(&ttl, &uname)
if err != nil && err != sql.ErrNoRows{
return nil, err
}
if res, err := strconv.ParseInt(ttl, 10, 64); err == nil {
ss.TTL = res
ss.Username = uname
} else {
return nil, err
}
defer stmtOut.Close()
return ss, nil
}
func RetrieveAllSessions() (*sync.Map, error) {
m := &sync.Map{}
stmtOut, err := dbConn.Prepare("SELECT * FROM sessions")
if err != nil {
log.Printf("%s", err)
return nil, err
}
rows, err := stmtOut.Query()
if err != nil {
log.Printf("%s", err)
return nil, err
}
for rows.Next() {
var id string
var ttlstr string
var login_name string
if er := rows.Scan(&id, &ttlstr, &login_name); er != nil {
log.Printf("retrive sessions error: %s", er)
break
}
if ttl, err1 := strconv.ParseInt(ttlstr, 10, 64); err1 == nil{
ss := &defs.SimpleSession{Username: login_name, TTL: ttl}
m.Store(id, ss)
log.Printf(" session id: %s, ttl: %d", id, ss.TTL)
}
}
return m, nil
}
func DeleteSession(sid string) error {
stmtOut, err := dbConn.Prepare("DELETE FROM sessions WHERE session_id = ?")
if err != nil {
log.Printf("%s", err)
return err
}
if _, err := stmtOut.Query(sid); err != nil {
return err
}
return nil
}
与业务的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package session
import (
"time"
"sync"
"github.com/avenssi/video_server/api/defs"
"github.com/avenssi/video_server/api/dbops"
"github.com/avenssi/video_server/api/utils"
)
var sessionMap *sync.Map
func init() {
sessionMap = &sync.Map{}
}
func nowInMilli() int64{
return time.Now().UnixNano()/1000000
}
func deleteExpiredSession(sid string) {
sessionMap.Delete(sid)
dbops.DeleteSession(sid)
}
func LoadSessionsFromDB() {
r, err := dbops.RetrieveAllSessions()
if err != nil {
return
}
r.Range(func(k, v interface{}) bool{
ss := v.(*defs.SimpleSession)
sessionMap.Store(k, ss)
return true
})
}
func GenerateNewSessionId(un string) string {
id, _ := utils.NewUUID()
ct := nowInMilli()
ttl := ct + 30 * 60 * 1000// Severside session valid time: 30 min
ss := &defs.SimpleSession{Username: un, TTL: ttl}
sessionMap.Store(id, ss)
dbops.InsertSession(id, ttl, un)
return id
}
func IsSessionExpired(sid string) (string, bool) {
ss, ok := sessionMap.Load(sid)
if ok {
ct := nowInMilli()
if ss.(*defs.SimpleSession).TTL < ct {
deleteExpiredSession(sid)
return "", true
}
return ss.(*defs.SimpleSession).Username, false
}
return "", true
}
校验权限
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main
import (
"net/http"
"github.com/avenssi/video_server/api/session"
"github.com/avenssi/video_server/api/defs"
)
var HEADER_FIELD_SESSION = "X-Session-Id"
var HEADER_FIELD_UNAME = "X-User-Name"
func validateUserSession(r *http.Request) bool {
sid := r.Header.Get(HEADER_FIELD_SESSION)
if len(sid) == 0 {
return false
}
uname, ok := session.IsSessionExpired(sid)
if ok {
return false
}
r.Header.Add(HEADER_FIELD_UNAME, uname)
return true
}
func ValidateUser(w http.ResponseWriter, r *http.Request) bool {
uname := r.Header.Get(HEADER_FIELD_UNAME)
if len(uname) == 0 {
sendErrorResponse(w, defs.ErrorNotAuthUser)
return false
}
return true
}
controller入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main
import (
"io"
"encoding/json"
"net/http"
"io/ioutil"
"github.com/julienschmidt/httprouter"
"github.com/avenssi/video_server/api/defs"
"github.com/avenssi/video_server/api/dbops"
"github.com/avenssi/video_server/api/session"
)
func CreateUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
res, _ := ioutil.ReadAll(r.Body)
ubody := &defs.UserCredential{}
if err := json.Unmarshal(res, ubody); err != nil {
sendErrorResponse(w, defs.ErrorRequestBodyParseFailed)
return
}
if err := dbops.AddUserCredential(ubody.Username, ubody.Pwd); err != nil {
sendErrorResponse(w, defs.ErrorDBError)
return
}
id := session.GenerateNewSessionId(ubody.Username)
su := &defs.SignedUp{Success: true, SessionId: id}
if resp, err := json.Marshal(su); err != nil {
sendErrorResponse(w, defs.ErrorInternalFaults)
return
} else {
sendNormalResponse(w, string(resp), 201)
}
}
func Login(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
uname := p.ByName("user_name")
io.WriteString(w, uname)
}
main主方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main
import (
"net/http"
"github.com/julienschmidt/httprouter"
)
type middleWareHandler struct {
r *httprouter.Router
}
func NewMiddleWareHandler(r *httprouter.Router) http.Handler {
m := middleWareHandler{}
m.r = r
return m
}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//check session
validateUserSession(r)
m.r.ServeHTTP(w, r)
}
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
router.POST("/user", CreateUser)
router.POST("/user/:user_name", Login)
return router
}
func main() {
r := RegisterHandlers()
mh := NewMiddleWareHandler(r)
http.ListenAndServe(":8000", mh)
}
streamServer
常量
1
2
3
4
5
6
package main
const (
VIDEO_DIR = "./videos/"
MAX_UPLOAD_SIZE = 1024 * 1024 * 50 //50MB
)
返回对象封装
1
2
3
4
5
6
7
8
9
10
11
package main
import (
"io"
"net/http"
)
func sendErrorResponse(w http.ResponseWriter, sc int, errMsg string) {
w.WriteHeader(sc)
io.WriteString(w, errMsg)
}
限流处理
限流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main
import (
"log"
)
type ConnLimiter struct {
concurrentConn int
bucket chan int
}
func NewConnLimiter(cc int) *ConnLimiter {
return &ConnLimiter {
concurrentConn: cc,
bucket: make(chan int, cc),
}
}
func (cl *ConnLimiter) GetConn() bool {
if len(cl.bucket) >= cl.concurrentConn {
log.Printf("Reached the rate limitation.")
return false
}
cl.bucket <- 1
return true
}
func (cl *ConnLimiter) ReleaseConn() {
c :=<- cl.bucket
log.Printf("New connction coming: %d", c)
}
接口
接口api
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main
import (
"io"
"os"
"net/http"
"html/template"
"io/ioutil"
"time"
"log"
"github.com/julienschmidt/httprouter"
)
func testPageHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
t, _ := template.ParseFiles("./videos/upload.html")
t.Execute(w, nil)
}
func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
vid := p.ByName("vid-id")
vl := VIDEO_DIR + vid
video, err := os.Open(vl)
if err != nil {
log.Printf("Error when try to open file: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
w.Header().Set("Content-Type", "video/mp4")
http.ServeContent(w, r, "", time.Now(), video)
defer video.Close()
}
func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE)
if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil {
sendErrorResponse(w, http.StatusBadRequest, "File is too big")
return
}
file, _, err := r.FormFile("file")
if err != nil {
log.Printf("Error when try to get file: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
data, err := ioutil.ReadAll(file)
if err != nil {
log.Printf("Read file error: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
}
fn := p.ByName("vid-id")
err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666)
if err != nil {
log.Printf("Write file error: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
w.WriteHeader(http.StatusCreated)
io.WriteString(w, "Uploaded successfully")
}
main主函数
主函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main
import (
"net/http"
"github.com/julienschmidt/httprouter"
)
type middleWareHandler struct {
r *httprouter.Router
l *ConnLimiter
}
func NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler {
m := middleWareHandler{}
m.r = r
m.l = NewConnLimiter(cc)
return m
}
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
router.GET("/videos/:vid-id", streamHandler)
router.POST("/upload/:vid-id", uploadHandler)
router.GET("/testpage", testPageHandler)
return router
}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !m.l.GetConn() {
sendErrorResponse(w, http.StatusTooManyRequests, "Too many requests")
return
}
m.r.ServeHTTP(w, r)
defer m.l.ReleaseConn()
}
func main() {
r := RegisterHandlers()
mh := NewMiddleWareHandler(r, 2)
http.ListenAndServe(":9000", mh)
}
scheduler异步任务
流程
用户—》调用api—》删除视频
api service—》scheduler任务—》把删除视频记录写入数据库
timer—》runner—》读取数据库中的删除记录—》执行操作—》删除视频文件
连接数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package dbops
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
var (
dbConn *sql.DB
err error
)
func init() {
dbConn, err = sql.Open("mysql", "root:123!@#@tcp(localhost:3306)/video_server?charset=utf8")
if err != nil {
panic(err.Error())
}
}
数据库操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package dbops
import (
"log"
_ "github.com/go-sql-driver/mysql"
)
func AddVideoDeletionRecord(vid string) error {
stmtIns, err := dbConn.Prepare("INSERT INTO video_del_rec (video_id) VALUES(?)")
if err != nil {
return err
}
_, err = stmtIns.Exec(vid)
if err != nil {
log.Printf("AddVideoDeletionRecord error: %v", err)
return err
}
defer stmtIns.Close()
return nil
}
数据操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package dbops
import (
"log"
_ "github.com/go-sql-driver/mysql"
)
func ReadVideoDeletionRecord(count int) ([]string, error) {
stmtOut, err := dbConn.Prepare("SELECT video_id FROM video_del_rec LIMIT ?")
var ids []string
if err != nil {
return ids, err
}
rows, err := stmtOut.Query(count)
if err != nil {
log.Printf("Query VideoDeletionRecord error: %v", err)
return ids, err
}
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return ids, err
}
ids = append(ids, id)
}
defer stmtOut.Close()
return ids, nil
}
func DelVideoDeletionRecord(vid string) error {
stmtDel, err := dbConn.Prepare("DELETE FROM video_del_rec WHERE video_id=?")
if err != nil {
return err
}
_, err = stmtDel.Exec(vid)
if err != nil {
log.Printf("Deleting VideoDeletionRecord error: %v", err)
return err
}
defer stmtDel.Close()
return nil
}
常量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package taskrunner
const (
READY_TO_DISPATCH = "d"
READY_TO_EXECUTE = "e"
CLOSE = "c"
VIDEO_PATH = "./videos/"
)
type controlChan chan string
type dataChan chan interface{}
type fn func(dc dataChan) error
runner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package taskrunner
import (
)
type Runner struct {
Controller controlChan
Error controlChan
Data dataChan
dataSize int
longLived bool
Dispatcher fn
Executor fn
}
func NewRunner(size int, longlived bool, d fn, e fn) *Runner {
return &Runner {
Controller: make(chan string, 1),
Error: make(chan string, 1),
Data: make(chan interface{}, size),
longLived: longlived,
dataSize: size,
Dispatcher: d,
Executor: e,
}
}
func (r *Runner) startDispatch() {
defer func() {
if !r.longLived {
close(r.Controller)
close(r.Data)
close(r.Error)
}
}()
for {
select {
case c :=<- r.Controller:
if c == READY_TO_DISPATCH {
err := r.Dispatcher(r.Data)
if err != nil {
r.Error <- CLOSE
} else {
r.Controller <- READY_TO_EXECUTE
}
}
if c == READY_TO_EXECUTE {
err := r.Executor(r.Data)
if err != nil {
r.Error <- CLOSE
} else {
r.Controller <- READY_TO_DISPATCH
}
}
case e :=<- r.Error:
if e == CLOSE {
return
}
default:
}
}
}
func (r *Runner) StartAll() {
r.Controller <- READY_TO_DISPATCH
r.startDispatch()
}
task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package taskrunner
import (
"os"
"errors"
"log"
"sync"
"github.com/avenssi/video_server/scheduler/dbops"
)
func deleteVideo(vid string) error {
err := os.Remove(VIDEO_PATH + vid)
if err != nil && !os.IsNotExist(err) {
log.Printf("Deleting video error: %v", err)
return err
}
return nil
}
func VideoClearDispatcher(dc dataChan) error {
res, err := dbops.ReadVideoDeletionRecord(3)
if err != nil {
log.Printf("Video clear dispatcher error: %v", err)
return err
}
if len(res) == 0 {
return errors.New("All tasks finished")
}
for _, id := range res {
dc <- id
}
return nil
}
func VideoClearExecutor(dc dataChan) error {
errMap := &sync.Map{}
var err error
forloop:
for {
select {
case vid :=<- dc:
go func(id interface{}) {
if err := deleteVideo(id.(string)); err != nil {
errMap.Store(id, err)
return
}
if err := dbops.DelVideoDeletionRecord(id.(string)); err != nil {
errMap.Store(id, err)
return
}
}(vid)
default:
break forloop
}
}
errMap.Range(func(k, v interface{}) bool {
err = v.(error)
if err != nil {
return false
}
return true
})
return err
}
启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package taskrunner
import (
"time"
//"log"
)
type Worker struct {
ticker *time.Ticker
runner *Runner
}
func NewWorker(interval time.Duration, r *Runner) *Worker {
return &Worker {
ticker: time.NewTicker(interval * time.Second),
runner: r,
}
}
func (w *Worker) startWorker() {
for {
select {
case <- w.ticker.C:
go w.runner.StartAll()
}
}
}
func Start() {
// Start video file cleaning
r := NewRunner(3, true, VideoClearDispatcher, VideoClearExecutor)
w := NewWorker(3, r)
go w.startWorker()
}
response
1
2
3
4
5
6
7
8
9
10
11
package main
import (
"net/http"
"io"
)
func sendResponse(w http.ResponseWriter, sc int, resp string) {
w.WriteHeader(sc)
io.WriteString(w, resp)
}
Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main
import (
"net/http"
"github.com/julienschmidt/httprouter"
"github.com/avenssi/video_server/scheduler/dbops"
)
func vidDelRecHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params){
vid := p.ByName("vid-id")
if len(vid) == 0 {
sendResponse(w, 400, "video id should not be empty")
return
}
err := dbops.AddVideoDeletionRecord(vid)
if err != nil {
sendResponse(w, 500, "Internal server error")
return
}
sendResponse(w, 200, "")
return
}
main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main
import (
"net/http"
"github.com/julienschmidt/httprouter"
"github.com/avenssi/video_server/scheduler/taskrunner"
)
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
router.GET("/video-delete-record/:vid-id", vidDelRecHandler)
return router
}
func main() {
go taskrunner.Start()
r := RegisterHandlers()
http.ListenAndServe(":9001", r)
}
部署
编译脚本build.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/bin/bash
# Build web and other services
cd ~/work/src/github.com/avenssi/video_server/api
env GOOS=linux_GOARCH=amd64 go build -o ../bin/api
cd ~/work/src/github.com/avenssi/video_server/scheduler
env GOOS=linux_GOARCH=amd64 go build -o ../bin/scheduler
cd ~/work/src/github.com/avenssi/video_server/streamserver
env GOOS=linux_GOARCH=amd64 go build -0 ../bin/streamserver
cd ~/work/src/github.com/avenssi/video_server/web
env GOOS=linux GOARCH=amd64 go build -o ../bin/web
上传脚本deploy.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/bin/bash
cp -R ./templates ./bin/
mkdir ./bin/videos
cd bin
nohup ./api &
nohup ./schduler &
nohup ./streamserver &
nohup ./web &
echo "deploy finished"
初始化数据库表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
create table comments (
id varchar(64) not null
video_id varchar(64),
author_id int(10),
content text,
time datetime default current_timestamp,
primary key(id)
);
create table sessions (
session_id tinytext not null,
TTL tinytext,
login_name text
);
alter table sessions add primary key (session_id(64))
create table users (
id int unsigned not null auto_increment,
login_name varchar(64),
pwd text not null,
unique key (login_name),
primary key (id)
);
create table video_del_rec (
video id varchar(64) not null,
primary key (video_id)
);
create table video_info (
id varchar(64) not null,
author_id int(10),
name text,
display_ctime text,
create_time datetime default current_timestamp
primary key (id)
);
本文由作者按照 CC BY 4.0 进行授权