CDN下载架构设计
ECUG深圳, Dec 2013
AstaXie
CDN, SNDA
AstaXie
CDN, SNDA
CDN下载加速主要是为最终用户提供稳定、快速、高质量的下载服务,有效提升下载服务器的处理能力,缓解服务器出口带宽压力,提升下载服务能力和用户下载体验
如下图所示:
1. 一级分发先把整个文件进行逻辑的分块,映射到机房内所有的机器,并发的拉取
2. 二级分发采用机房内块copy
type Task struct { SessionID string Lock sync.RWMutex FromFile string ToFile string Blocks []*SubBlock Servers []*Server ServerStatus map[string]*ServerMap FirstSchedule map[string][]*SubBlock SecondSchedule map[string]map[string][]*SubBlock }
func (this *MainController) Post() { //解析任务 t, err := models.GetTask(jsoninfo) go DoSchedule(t) this.Ctx.WriteString("ok") }
//开始执行tofile下面的任务进行分发,第一次的任务分发 func DoSchedule(task *models.Task) { task.Lock.Lock() defer task.Lock.Unlock() for ip, sm := range task.ServerStatus { if sm.Level == 1 { sm.IsStart = true i := 0 subtask := &models.SubTask{SessionID: task.SessionID, FromFile: task.FromFile, ToFile: task.ToFile, ToIP: ip} for _, block := range task.FirstSchedule[ip] { i++ //依次根据任务发送请求,如果出错就跳到下一个任务,每次只发送一个任务 if block.IsSend != true { if i < MAXBLOCKS { subtask.Blocks = append(subtask.Blocks, block) } else { break } if len(task.FirstSchedule[ip]) == i && len(task.SecondSchedule[ip]) == 0 { subtask.IsLast = true } block.IsSend = true } } retry := 0 SendToPortal: //发送任务到agent err := models.SendToPortal(ip, subtask) } } }
func (this *BackController) Post() { 。。。 //获取对应的任务 t := DoingList.Get(SessionID) t.Lock.Lock() defer t.Lock.Unlock() if Errorinfo != "" { //如果有错误那么置对应的IP状态 //如果是第一层的服务器出错,那么底层的所有服务器都设置为错误 if t.ServerStatus[ToIP].Level == 1 { 。。。 //当第一层里面同组的其他服务全部失败的时候,进行第二层服务器的错误处理 。。。 } tryit := 0 for { err := models.BackToSender(t.SessionID, ToFile, ToIP, Errorinfo) } go CheckFinish(SessionID, ToIP) return } //检查一层的代码块是否都完成 //检查二层的代码快是否都完成 go DoNextTask(ToIP, SessionID, fromip, donexttask) beego.Info("finish") this.Ctx.WriteString("ok") }
任务很简单,接收任务,下载数据块
func task(t *models.Task) { err := models.InitTask(t) for k, b := range t.Blocks { //检测本地有缓存文件.astaxie 如果该快文件已经下载过并且MD5一样不再下载 ReadToFile: //下载文件 models.ReadToFile(t.FromFile, t.ToFileFD, b) SaveToFile: //下载成功保存到.astaxie缓存文件中 models.SaveToFile(t.TmpFileFD, b) BackInfoToCenter: //报告调度中心完成了该快的下载 err = models.BackInfoToCenter(t.SessionID, t.ToIP, t.ToFile, b) } //如果是最后一块那么删除.astaxie缓存文件 if t.IsLast { models.DelTmpFile(t.ToFile) } }
基于两个点得定位:
- 用户IP对应的zone
- 机器负载值
Treap=Tree+Heap
var ( IpZones *treap.Tree ) func init() { IpZones = treap.NewTree(IPLess) LoadIP() } type IPKey struct { StartIP uint64 EndIp uint64 } type IPItem struct { StartIP uint64 EndIp uint64 Zone string } func IPLess(a, b interface{}) bool { aa := a.(*IPKey) bb := b.(*IPKey) //超找IP过程 if bb.StartIP == bb.EndIp { if bb.StartIP > aa.StartIP && aa.EndIp > bb.StartIP { return false } } if aa.StartIP == aa.EndIp { if aa.StartIP > bb.StartIP && bb.EndIp > aa.StartIP { return false } } //插入用 if aa.StartIP < bb.StartIP { return true } if aa.StartIP == bb.StartIP { return aa.EndIp < bb.EndIp } return false }
func LoadIP() { file, err := os.Open("conf/source_div.acl") // For read access. breade := bufio.NewReaderSize(file, 10240) //文件中acl开始模块 regstart, err := regexp.Compile(`acl "([\w]+)" {`) cruzone := "" for { line, _, err := breade.ReadLine() //zone开始 matchs := regstart.FindStringSubmatch(string(line)) if len(matchs) == 2 { cruzone = matchs[1] continue } //结束zone b, _ := regexp.Match(`};`, line) if b { cruzone = "" continue } //中间的IP处理 ips := strings.Split(string(line), ";") for _, ip := range ips { if ip == "" { continue } subips := strings.Split(ip, `/`) if len(subips) == 1 { ip_long := getip(subips[0]) startip := ip_long endip := ip_long ipitem := &IPItem{startip, endip, cruzone} ipkey := &IPKey{startip, endip} IpZones.Insert(ipkey, ipitem) } if len(subips) > 1 { ip_long := getip(subips[0]) startip := ip_long & (((1 << mask) - 1) << (32 - mask)) endip := startip + ((1 << (32 - mask)) - 1) ipitem := &IPItem{startip, endip, cruzone} ipkey := &IPKey{startip, endip} IpZones.Insert(ipkey, ipitem) } } } }
//获取客户端地址的long ip_long := models.IP2Number(requestip) //地址库里面返回对应地址的zone item := models.IpZones.Get(&models.IPKey{ip_long, ip_long}) //如果IP地址不在地址库里面,那么执行默认的路由 if item == nil { for ip, _ := range servers { isok := IsOkIp(ip) if isok == 1 { return ip } else if isok == 2 { CanServ = append(CanServ, ip) } AllServer = append(AllServer, ip) } if len(CanServ) > 0 { return CanServ[randseed.Intn(len(CanServ))] } else if len(AllServer) > 0 { return AllServer[randseed.Intn(len(AllServer))] } } else { cruzone := item.(*models.IPItem) ipisp := strings.Split(cruzone.Zone, "_") //根据地址库里面的zone获取对应服务的idc列表 zoneips, zok := d.Zones[cruzone.Zone] if zok { //先从配置的zoneip中调用 优先第一层调度 for ip, _ := range zoneips[0] { if _, ok := servers[ip]; ok { isok := IsOkIp(ip) if isok == 1 { return ip } else if isok == 2 { CanServ = append(CanServ, ip) } AllServer = append(AllServer, ip) } } //第一层调度失败,第二层调度 for ip, _ := range zoneips[1] { if _, ok := servers[ip]; ok { isok := IsOkIp(ip) if isok == 1 { return ip } else if isok == 2 { CanServ = append(CanServ, ip) } AllServer = append(AllServer, ip) } } } if len(CanServ) > 0 { return CanServ[randseed.Intn(len(CanServ))] } for ip, isp := range servers { if isp&curisp == isp { isok := IsOkIp(ip) if isok == 1 { return ip } else if isok == 2 { TwoCanServ = append(TwoCanServ, ip) } AllServer = append(AllServer, ip) } } if len(TwoCanServ) > 0 { return TwoCanServ[randseed.Intn(len(TwoCanServ))] } else if len(AllServer) > 0 { return AllServer[randseed.Intn(len(AllServer))] } }
高富帅特性: