CDN下载架构设计

ECUG深圳, Dec 2013

AstaXie

CDN, SNDA

Overview

CDN下载系统

CDN下载加速主要是为最终用户提供稳定、快速、高质量的下载服务,有效提升下载服务器的处理能力,缓解服务器出口带宽压力,提升下载服务能力和用户下载体验

如下图所示:

下载系统主要过程

老系统的设计

存在的问题

针对问题解决方案

1. 一级分发先把整个文件进行逻辑的分块,映射到机房内所有的机器,并发的拉取
2. 二级分发采用机房内块copy

设计图

代码设计之任务原型

type Task struct {
    SessionID      string
    Lock           sync.RWMutex
    FromFile       string
    ToFile         string
    Blocks         []*SubBlock
    Servers        []*Server
    ServerStatus   map[string]*ServerMap
    FirstSchedule  map[string][]*SubBlock
    SecondSchedule map[string]map[string][]*SubBlock
}

代码设计之接收任务

func (this *MainController) Post() {
    //解析任务
    t, err := models.GetTask(jsoninfo)

    go DoSchedule(t)
    this.Ctx.WriteString("ok")
}

代码设计之启动一层分发任务

//开始执行tofile下面的任务进行分发,第一次的任务分发
func DoSchedule(task *models.Task) {
    task.Lock.Lock()
    defer task.Lock.Unlock()
    for ip, sm := range task.ServerStatus {
        if sm.Level == 1 {
            sm.IsStart = true
            i := 0
            subtask := &models.SubTask{SessionID: task.SessionID, FromFile: task.FromFile, ToFile: task.ToFile, ToIP: ip}
            for _, block := range task.FirstSchedule[ip] {
                i++
                //依次根据任务发送请求,如果出错就跳到下一个任务,每次只发送一个任务
                if block.IsSend != true {
                    if i < MAXBLOCKS {
                        subtask.Blocks = append(subtask.Blocks, block)
                    } else {
                        break
                    }
                    if len(task.FirstSchedule[ip]) == i && len(task.SecondSchedule[ip]) == 0 {
                        subtask.IsLast = true
                    }
                    block.IsSend = true
                }
            }
            retry := 0
        SendToPortal:
            //发送任务到agent
            err := models.SendToPortal(ip, subtask)
        }
    }
}

代码设计之触发二级分发任务

func (this *BackController) Post() {
    。。。
    //获取对应的任务
    t := DoingList.Get(SessionID)
    t.Lock.Lock()
    defer t.Lock.Unlock()
    if Errorinfo != "" {
        //如果有错误那么置对应的IP状态
        //如果是第一层的服务器出错,那么底层的所有服务器都设置为错误
        if t.ServerStatus[ToIP].Level == 1 {
            。。。
            //当第一层里面同组的其他服务全部失败的时候,进行第二层服务器的错误处理
            。。。
        }
        tryit := 0
        for {
            err := models.BackToSender(t.SessionID, ToFile, ToIP, Errorinfo)
            
        }
        go CheckFinish(SessionID, ToIP)
        return
    }
    //检查一层的代码块是否都完成
    //检查二层的代码快是否都完成
    go DoNextTask(ToIP, SessionID, fromip, donexttask)
    beego.Info("finish")
    this.Ctx.WriteString("ok")
}

agent代码分享

任务很简单,接收任务,下载数据块

func task(t *models.Task) {
    err := models.InitTask(t)
    for k, b := range t.Blocks {

        //检测本地有缓存文件.astaxie 如果该快文件已经下载过并且MD5一样不再下载
        
    ReadToFile:
         //下载文件
        models.ReadToFile(t.FromFile, t.ToFileFD, b)
    SaveToFile:
        //下载成功保存到.astaxie缓存文件中
        models.SaveToFile(t.TmpFileFD, b)

    BackInfoToCenter:
        //报告调度中心完成了该快的下载
        err = models.BackInfoToCenter(t.SessionID, t.ToIP, t.ToFile, b)        
    }
    //如果是最后一块那么删除.astaxie缓存文件
    if t.IsLast {
        models.DelTmpFile(t.ToFile)
    }
}

调度系统

基于两个点得定位:
- 用户IP对应的zone
- 机器负载值

管理系统的后台设置

IP存储结构Treap

Treap=Tree+Heap

Treap的Go实现

var (
    IpZones *treap.Tree
)

func init() {
    IpZones = treap.NewTree(IPLess)
    LoadIP()
}

type IPKey struct {
    StartIP uint64
    EndIp   uint64
}

type IPItem struct {
    StartIP uint64
    EndIp   uint64
    Zone    string
}

func IPLess(a, b interface{}) bool {
    aa := a.(*IPKey)
    bb := b.(*IPKey)

    //超找IP过程
    if bb.StartIP == bb.EndIp {
        if bb.StartIP > aa.StartIP && aa.EndIp > bb.StartIP {
            return false
        }
    }

    if aa.StartIP == aa.EndIp {
        if aa.StartIP > bb.StartIP && bb.EndIp > aa.StartIP {
            return false
        }
    }

    //插入用
    if aa.StartIP < bb.StartIP {
        return true
    }
    if aa.StartIP == bb.StartIP {
        return aa.EndIp < bb.EndIp
    }
    return false
}

IP库整理结构图

代码设计之IP库设计

func LoadIP() {
    file, err := os.Open("conf/source_div.acl") // For read access.
    breade := bufio.NewReaderSize(file, 10240)

    //文件中acl开始模块
    regstart, err := regexp.Compile(`acl "([\w]+)" {`)

    cruzone := ""
    for {
        line, _, err := breade.ReadLine()

        //zone开始
        matchs := regstart.FindStringSubmatch(string(line))

        if len(matchs) == 2 {
            cruzone = matchs[1]
            continue
        }

        //结束zone
        b, _ := regexp.Match(`};`, line)
        if b {
            cruzone = ""
            continue
        }

        //中间的IP处理
        ips := strings.Split(string(line), ";")
        for _, ip := range ips {
            if ip == "" {
                continue
            }
            subips := strings.Split(ip, `/`)
            if len(subips) == 1 {
                ip_long := getip(subips[0])
                startip := ip_long
                endip := ip_long
                ipitem := &IPItem{startip, endip, cruzone}
                ipkey := &IPKey{startip, endip}
                IpZones.Insert(ipkey, ipitem)
            }
            if len(subips) > 1 {
                ip_long := getip(subips[0])
                startip := ip_long & (((1 << mask) - 1) << (32 - mask))
                endip := startip + ((1 << (32 - mask)) - 1)
                ipitem := &IPItem{startip, endip, cruzone}
                ipkey := &IPKey{startip, endip}
                IpZones.Insert(ipkey, ipitem)
            }
        }
    }
}

代码设计之调度算法

//获取客户端地址的long
ip_long := models.IP2Number(requestip)
//地址库里面返回对应地址的zone
item := models.IpZones.Get(&models.IPKey{ip_long, ip_long})
//如果IP地址不在地址库里面,那么执行默认的路由
if item == nil {
    for ip, _ := range servers {
        isok := IsOkIp(ip)
        if isok == 1 {
            return ip
        } else if isok == 2 {
            CanServ = append(CanServ, ip)
        }
        AllServer = append(AllServer, ip)
    }
    if len(CanServ) > 0 {
        return CanServ[randseed.Intn(len(CanServ))]
    } else if len(AllServer) > 0 {
        return AllServer[randseed.Intn(len(AllServer))]
    }
} else {
    cruzone := item.(*models.IPItem)
    ipisp := strings.Split(cruzone.Zone, "_")        
    //根据地址库里面的zone获取对应服务的idc列表
    zoneips, zok := d.Zones[cruzone.Zone]
    if zok {
        //先从配置的zoneip中调用 优先第一层调度
        for ip, _ := range zoneips[0] {
            if _, ok := servers[ip]; ok {
                isok := IsOkIp(ip)
                if isok == 1 {
                    return ip
                } else if isok == 2 {
                    CanServ = append(CanServ, ip)
                }
                AllServer = append(AllServer, ip)
            }
        }
        //第一层调度失败,第二层调度
        for ip, _ := range zoneips[1] {
            if _, ok := servers[ip]; ok {
                isok := IsOkIp(ip)
                if isok == 1 {
                    return ip
                } else if isok == 2 {
                    CanServ = append(CanServ, ip)
                }
                AllServer = append(AllServer, ip)
            }
        }
    }
    if len(CanServ) > 0 {
        return CanServ[randseed.Intn(len(CanServ))]
    }
    for ip, isp := range servers {
        if isp&curisp == isp {
            isok := IsOkIp(ip)
            if isok == 1 {
                return ip
            } else if isok == 2 {
                TwoCanServ = append(TwoCanServ, ip)
            }
            AllServer = append(AllServer, ip)
        }
    }
    if len(TwoCanServ) > 0 {
        return TwoCanServ[randseed.Intn(len(TwoCanServ))]
    } else if len(AllServer) > 0 {
        return AllServer[randseed.Intn(len(AllServer))]
    }
}

目前的运行状况

目前正在改进的点

为什么选择Go

高富帅特性:

我和Go的故事

Q&A

Thank you

AstaXie

CDN, SNDA