当前位置: 首页 > news >正文

做彩票网站模板网站建设结算方式

做彩票网站模板,网站建设结算方式,网站不被收录的原因,广西冶金建设公司网站文章目录 0x00 准备0x01 MapReduce简介0x02 RPC0x03 调试0x04 代码coordinator.gorpc.goworker.go 0x00 准备 阅读MapReduce论文配置GO环境 因为之前没用过GO,所以 先在网上学了一下语法A Tour of Go 感觉Go的接口和方法的语法和C挺不一样, 并发编程也挺有意思 0x01 MapRed… 文章目录 0x00 准备0x01 MapReduce简介0x02 RPC0x03 调试0x04 代码coordinator.gorpc.goworker.go 0x00 准备 阅读MapReduce论文配置GO环境 因为之前没用过GO,所以 先在网上学了一下语法A Tour of Go 感觉Go的接口和方法的语法和C挺不一样, 并发编程也挺有意思 0x01 MapReduce简介 需要实现master和coordinator。 MapReduce分为两个阶段:Map和Reduce阶段。 Map阶段函数提供Key,比如pg-being_ernest.txt是key然后Worker通过这个Key获取Value。比如pg-being_ernest.txt的具体内容。然后将Key和Value在例子中是文章的内容传递给map function。获取结果并将结果分成R个Reduce内容。 举个例子。假设我们要对pg-being_ernest.txt和pg-dorian_gray.txt统计词频。那么就要有两个Map Task(不一定有两个Worker比如有3个Worker那么就是2个Worker干活一个围观如果只有一个Worker那么该Worker会被前后分配两次Map操作)。假设有3个Reduce操作那么Map的中间操作就会按照key被分为3个文件。 pg-being_ernest.txt对应Map0 , Map0操作的kv被分进mr-0-0,mr-0-1,mr-0-2 pg-dorian_gray.txt对应Map0 , Map0操作的kv被分进mr-1-0,mr-1-1,mr-1-2 当所有的Mapf已经生成结果Worker就会被指派Reduce操作。比如被指派的Reduce操作编号为2那么Reduce就会读取mr-0-2,mr-1-2。并且聚合相同的Key,传递给Reduce函数。 比如,pg-being_ernest.txt中的map操作有kva 1 b 1 b 1输出到mr-0-2。pg-dorian_gray.txt中的map操作有kvc 1 b 1 c 1输出到mr-0-2。 然后Task编号为2的Reduce任务会读取所有对应的中间文件。得到key。a 1 b 1 b 1 c 1 b 1 c 1。然后再对要处理的key进行排序,得到 a 1 b 1 b 1 b 1 c 1 c 1。再按照相同的key调用reduce函数。 上面例子的调用为 reducef(key:a,value:list[1]),得到1 reducef(key:b,value:list[1,1,1]),得到3 reducef(key:c,value:list[1,1]),得到2 最后将kvs:[{“a”,“1”},{“b”,“3”},{“c”,“2”}]写入该reduce生成的文件mr-out-2 0x02 RPC 使用GO的RPC库可以简单地实现Server 学习时参考了Go 每日一库之 rpc - 知乎 (zhihu.com) 在MapReduce操作流程中就是 首先启动多个Worker以下简称C和一个Coordinator以下简称SC每隔一段时间(比如1s)会向S发送一个任务请求 S首先检查Map任务还有没有分配完注意不是运行完。如果没有分配一个Map任务给C如果Map任务分配完了并且还没有工作完S让C等待如果Map工作完了。Reduce还没分配完了S给C分配一个空闲的Reduce任务如果Reduce都工作完了所有任务也都结束了。 如果C完成了任务会向S发送一个请求。S知道了某个任务完成就会进行相应的操作标记。 一些注意的点: 每个任务是有时间上限的10s。每分配一个任务就会启动一个GO程然后等待相应的时间检查是否完成了工作。如果没完成将该任务编号重新加入管道。 如何判断一个任务是否完成呢? 比如第一个Worker申请到了任务1过了10s钟还没有完成S又将任务1加入待完成管道。此时第2个worker申请到了任务1又过了4s第一个Worker发送一个MapDone的请求给S。S如何判断是否完成了该任务。 我的处理是维护任务是由哪个Worker运行的状态。其中Worker由RPC的时间戳标记。比如worker1在第一次请求时时间戳为13213123,Server维护maptask[1]是由13213123正在运行当第一次超时maptask[1]变成了worker2请求时的时间戳13219889。在第14s,收到MapDone的请求检查其时间戳为13213123和当前正在运行的时间戳不同所以丢弃掉该结果。 还有就是并发处理这个使用锁就行了。 0x03 调试 命令行的参数因为不用shell的话不能用通配符pg*.txt代替只能输入所有文件名 pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt 在调试时出现报错 cannot load plugin ./wc.so err: plugin.Open(./wc): plugin was built with a different version of package internal/abi 是因为build wc.so时的参数和运行mr参数不一致导致的。 使用./test-mr-many.sh 3重复测试3次。通过测试 感觉Lab1做下来还是挺通透。像是引入GO和相关概念。通过lab,学习到了GO调试。 0x04 代码 coordinator.go package mrimport (logsynctime ) import net import os import net/rpc import net/httptype status int // 用于指示worker的状态const (notStart status iotarunningtaskDone ) const workMaxTime 12 * time.Secondtype Coordinator struct {// Your definitions here.nReduce int // Reduce数量mMap int // Map数量taskDone boolreduceTaskStatus []statusmapTaskStatus []status// runningMap 是当前正在running的rpcId// 想一下这种情况第一个worker没有在10秒内返回结果于是master开始把同样的任务返回给了第二个worker,此时又过了几秒比如两秒钟// 那么master如何判断是第二个worker完成了任务还是第一个worker呢runningMap []RpcIdTrunningReduce []RpcIdTmapTasks chan TaskIdT // 待开始的mapreduceTasks chan TaskIdT // 待开始的reducefiles []string // 要进行task的文件mapCnt int // 已完成的map数量reduceCnt int // 已完成的reduce数量latch *sync.Cond }// Your code here -- RPC handlers for the worker to call.// Example // an example RPC handler. // // the RPC argument and reply types are defined in rpc.go. func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {reply.Y args.X 1return nil }// Appoint 用于worker请求一个任务 func (c *Coordinator) Appoint(request *ReqArgs, reply *ResArgs) error {reply.ResId request.ReqIdreply.MapNumM c.mMapreply.ReduceNumN c.nReducec.latch.L.Lock()done : c.taskDonec.latch.L.Unlock()if done {reply.ResOp WorkDonereturn nil}switch request.ReqOp {case WorkReq:{// 请求一个任务c.latch.L.Lock()if len(c.mapTasks) 0 {// 如果map任务还没有完全分配 分配一个map workertaskId : -c.mapTasksreply.ResTaskId taskIdreply.ResContent c.files[taskId]reply.ResOp WorkMapc.runningMap[taskId] reply.ResIdc.mapTaskStatus[taskId] runningc.latch.L.Unlock()go c.checkDone(WorkMap, reply.ResTaskId)log.Printf(Assign map \t%d to \t%d\n, reply.ResTaskId, reply.ResId)return nil}if c.mapCnt c.mMap {// 如果map任务已经全部分配完了但是还没有运行完成还不能开始reduce// worker需要暂时等待一下reply.ResOp WorkNothingc.latch.L.Unlock()log.Println(Map All assigned but not done)return nil}if len(c.reduceTasks) 0 {// 已经确定完成了所有map还没有分配完reducetaskId : -c.reduceTasksreply.ResTaskId taskIdreply.ResOp WorkReducec.runningReduce[taskId] reply.ResIdc.reduceTaskStatus[taskId] runningc.latch.L.Unlock()go c.checkDone(WorkReduce, reply.ResTaskId)log.Printf(Assign reduce \t%d to \t%d\n, reply.ResTaskId, reply.ResId)return nil}// 如果分配完了所有的reduce,但是还没有done.worker需要等待reply.ResOp WorkNothinglog.Println(Reduce All assigned but not done)c.latch.L.Unlock()return nil}case WorkMapDone:{c.latch.L.Lock()defer c.latch.L.Unlock()if c.runningMap[request.ReqTaskId] ! request.ReqId || c.mapTaskStatus[request.ReqTaskId] ! running {// 说明该map已经被abortreply.ResOp WorkTerminatereturn nil}log.Printf(Work Map \t%d done by \t%d\n, request.ReqTaskId, request.ReqId)c.mapTaskStatus[request.ReqTaskId] taskDonec.mapCnt}case WorkReduceDone:{c.latch.L.Lock()defer c.latch.L.Unlock()if c.runningReduce[request.ReqTaskId] ! request.ReqId || c.reduceTaskStatus[request.ReqTaskId] ! running {// 说明该map已经被abortreply.ResOp WorkTerminatereturn nil}c.reduceTaskStatus[request.ReqTaskId] taskDonec.reduceCntlog.Printf(Work Reduce \t%d done by \t%d\n, request.ReqTaskId, request.ReqId)if c.reduceCnt c.nReduce {c.taskDone truereply.ResOp WorkDone}}default:return nil}return nil }// start a thread that listens for RPCs from worker.go func (c *Coordinator) server() {log.Println(Launching Server)e : rpc.Register(c)if e ! nil {log.Fatal(register error:, e)}rpc.HandleHTTP()//l, e : net.Listen(tcp, :1234)sockname : coordinatorSock()_ os.Remove(sockname)l, e : net.Listen(unix, sockname)go func(l net.Listener) {for {time.Sleep(5 * time.Second)if c.Done() {err : l.Close()if err ! nil {log.Fatal(close error:, err)}}}}(l)if e ! nil {log.Fatal(listen error:, e)}go func() {err : http.Serve(l, nil)if err ! nil {log.Fatal(server error:, err)}}() }// Done main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. func (c *Coordinator) Done() bool {c.latch.L.Lock()defer c.latch.L.Unlock()// Your code here.return c.taskDone }// checkDone 检查任务是否完成 func (c *Coordinator) checkDone(workType WorkType, t TaskIdT) {time.Sleep(workMaxTime)c.latch.L.Lock()defer c.latch.L.Unlock()switch workType {case WorkMap:{if c.mapTaskStatus[t] ! taskDone {c.mapTaskStatus[t] notStartc.mapTasks - t}}case WorkReduce:{if c.reduceTaskStatus[t] ! taskDone {// 如果没有完成任务c.reduceTaskStatus[t] notStartc.reduceTasks - t}}default:log.Panicf(Try Check Invalid WorkType %v\n, workType)}}// MakeCoordinator create a Coordinator. // main/mrcoordinator.go calls this function. // nReduce is the number of reduce tasks to use. func MakeCoordinator(files []string, nReduce int) *Coordinator {log.Println(Launching Master Factory)c : Coordinator{}c.nReduce nReducec.mMap len(files) // 每个file对应一个mapc.taskDone falsec.files filesc.mapTasks make(chan TaskIdT, c.mMap)c.mapTaskStatus make([]status, c.mMap)c.runningMap make([]RpcIdT, c.mMap)c.reduceTaskStatus make([]status, nReduce)c.reduceTasks make(chan TaskIdT, nReduce)c.runningReduce make([]RpcIdT, nReduce)c.latch sync.NewCond(sync.Mutex{})for i : 0; i c.mMap; i {c.mapTasks - TaskIdT(i)c.runningMap[i] -1c.mapTaskStatus[i] notStart}for i : 0; i c.nReduce; i {c.reduceTasks - TaskIdT(i)c.runningReduce[i] -1c.reduceTaskStatus[i] notStart}c.server()return c }rpc.go package mr// // RPC definitions. // // remember to capitalize all names. //import os import strconv// // example to show how to declare the arguments // and reply for an RPC. //type ExampleArgs struct {X int }type ExampleReply struct {Y int } type RpcIdT int64 // RpcIdT 是通过时间戳生成的, 指示一个唯一的RpcId type ReqArgs struct {ReqId RpcIdTReqOp WorkTypeReqTaskId TaskIdT }// ResArgs 是RPC的返回 // Response type ResArgs struct {ResId RpcIdTResOp WorkTypeResTaskId TaskIdT // 分配的任务编号ResContent stringReduceNumN int // 有n个reduceMapNumM int // 有M个map任务 } type WorkType int// TaskIdT 是对任务的编号 type TaskIdT int// 枚举工作类型 const (WorkNothing WorkType iotaWorkReq // worker申请工作WorkMap // 分配worker进行map操作WorkReduce // 分配worker进行reduce操作WorkDone // [[unused]]master所有的工作完成WorkTerminate // 工作中止WorkMapDone // Worker完成了map操作WorkReduceDone // Worker完成了reduce操作 )// Rpc exports struct we need type Rpc struct {Req ReqArgsRes ResArgs }// Cook up a unique-ish UNIX-domain socket name // in /var/tmp, for the coordinator. // Cant use the current directory since // Athena AFS doesnt support UNIX-domain sockets. func coordinatorSock() string {s : /var/tmp/824-mr-s strconv.Itoa(os.Getuid())return s }worker.go package mrimport (encoding/jsonfmtioossortstrconvtime ) import log import net/rpc import hash/fnvconst sleepTime 500 * time.Millisecond// KeyValue // Map functions return a slice of KeyValue type KeyValue struct {Key stringValue string } type ByKey []KeyValue// Len 通过HashKey进行排序 func (a ByKey) Len() int { return len(a) } func (a ByKey) Swap(i, j int) { a[i], a[j] a[j], a[i] } func (a ByKey) Less(i, j int) bool { return ihash(a[i].Key) ihash(a[j].Key) }// use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. func ihash(key string) int {h : fnv.New32a()_, err : h.Write([]byte(key))if err ! nil {return 0}return int(h.Sum32() 0x7fffffff) }// Worker // main/mrworker.go calls this function. func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.for {timeStamp : time.Now().Unix()rpcId : RpcIdT(timeStamp)req : ReqArgs{}req.ReqId rpcIdreq.ReqOp WorkReq // 请求一个工作res : ResArgs{}ok : call(Coordinator.Appoint, req, res)if !ok {// 如果Call发生错误log.Println(Maybe Coordinator Server has been closed)return}switch res.ResOp {case WorkDone:// 所有工作已经完成returncase WorkMap:doMap(rpcId, res, mapf)case WorkReduce:doReduce(rpcId, res, reducef)case WorkNothing:// 等待time.Sleep(sleepTime)default:break}time.Sleep(sleepTime)} } func doMap(rpcId RpcIdT, response *ResArgs, mapf func(string, string) []KeyValue) {// filename 是response中的文件名filename : response.ResContentfile, err : os.Open(filename)if err ! nil {log.Fatalf(cannot open %v, filename)}defer func(file *os.File) {_ file.Close()}(file)// content读取该文件中的所有内容content, err : io.ReadAll(file)if err ! nil {log.Fatalf(cannot read %v, filename)}kvs : mapf(filename, string(content))// 需要将kv输出到n路 中间文件中ofiles : make([]*os.File, response.ReduceNumN)encoders : make([]*json.Encoder, response.ReduceNumN)for i : 0; i response.ReduceNumN; i {// 这里输出的名字是mr-ResTaskId-reduceN// 其中ResTaskId是0~m的数字oname : mr- strconv.Itoa(int(response.ResTaskId)) - strconv.Itoa(i)ofiles[i], err os.Create(oname)if err ! nil {log.Fatal(Cant Create Intermediate File: , oname)}defer func(file *os.File, oname string) {err : file.Close()if err ! nil {log.Fatal(Cant Close Intermediate File, oname)}}(ofiles[i], oname)encoders[i] json.NewEncoder(ofiles[i])}for _, kv : range kvs {ri : ihash(kv.Key) % response.ReduceNumNerr : encoders[ri].Encode(kv)if err ! nil {log.Fatal(Encode Error: , err)return}}req : ReqArgs{ReqId: rpcId,ReqOp: WorkMapDone,ReqTaskId: response.ResTaskId,}res : ResArgs{}call(Coordinator.Appoint, req, res) }func doReduce(rpcId RpcIdT, response *ResArgs, reducef func(string, []string) string) {rid : response.ResTaskId // 当前reduce的编号var kva []KeyValuefor i : 0; i response.MapNumM; i {// 读取所有该rid的中间值func(mapId int) {// 读取m-rid的中间值inputName : mr- strconv.Itoa(i) - strconv.Itoa(int(rid))// 在当前对应r的输出中获取所有keyifile, err : os.Open(inputName)if err ! nil {log.Fatal(Cant open file: , inputName)}defer func(file *os.File) {err : file.Close()if err ! nil {log.Fatal(Cant close file: , inputName)}}(ifile)dec : json.NewDecoder(ifile)for {var kv KeyValueif err : dec.Decode(kv); err ! nil {break}kva append(kva, kv) //}}(i)}// 通过hashKey排序sort.Sort(ByKey(kva))intermediate : kva[:]oname : mr-out- strconv.Itoa(int(rid))ofile, err : os.Create(oname)if err ! nil {log.Fatal(Cant create file: , oname)}defer func(ofile *os.File) {err : ofile.Close()if err ! nil {log.Fatal(Cant close file: , oname)}}(ofile)// log.Println(Total kv len: , len(intermediate))// cnt : 0i : 0for i len(intermediate) {j : i 1for j len(intermediate) intermediate[j].Key intermediate[i].Key {j}var values []stringfor k : i; k j; k {values append(values, intermediate[k].Value)}// cntoutput : reducef(intermediate[i].Key, values)// this is the correct format for each line of Reduce output._, fprintf : fmt.Fprintf(ofile, %v %v\n, intermediate[i].Key, output)if fprintf ! nil {return}i j}// log.Println(Unique key count: , cnt)req : ReqArgs{ReqId: rpcId,ReqOp: WorkReduceDone,ReqTaskId: response.ResTaskId,}res : ResArgs{}call(Coordinator.Appoint, req, res) }// CallExample // example function to show how to make an RPC call to the coordinator. // // the RPC argument and reply types are defined in rpc.go. func CallExample() {// declare an argument structure.args : ExampleArgs{}// fill in the argument(s).args.X 99// declare a reply structure.reply : ExampleReply{}// send the RPC request, wait for the reply.call(Coordinator.Example, args, reply)// reply.Y should be 100.fmt.Printf(reply.Y %v\n, reply.Y) }// send an RPC request to the coordinator, wait for the response. // usually returns true. // returns false if something goes wrong. func call(rpcName string, args interface{}, reply interface{}) bool {// c, err : rpc.DialHTTP(tcp, 127.0.0.1:1234)sockname : coordinatorSock()c, err : rpc.DialHTTP(unix, sockname)if err ! nil {log.Fatal(dialing:, err)}defer func(c *rpc.Client) {err : c.Close()if err ! nil {log.Fatal(Close Client Error When RPC Calling, err)}}(c)err c.Call(rpcName, args, reply)if err nil {return true}fmt.Println(err)return false }
http://www.hkea.cn/news/14509296/

相关文章:

  • 国外专门用于做网站图片的wordpress全站ajax
  • wap类网站临沂网站建设哪家好
  • 静态网站 服务器seo最新教程
  • 小企业网站建设有什么用长春建站程序
  • 怎么样更好的做网站怎么修复网站死链
  • 网站建设存在困难关键洞察力
  • 自己造网站建设银行江苏省行网站
  • 河东做网站公司wordpress游客看小图登陆查看大图
  • 汽车网站建设可行性分析娱乐网站的特点
  • 山东汽车行业网站开发网站风格趋势
  • 天津优化网站网站备案审批号
  • 做电子请帖的网站科技企业网站
  • 彩票网站平台企业年报系统官网
  • 网站初期建设宣传网站在百度上搜不到
  • flash网站的制作为什么需要网站开发
  • 新安县住房和城乡建设局网站网络设计一般包括
  • 免费网站建设专业服务平台企业做网站需要租服务器吗
  • 做网站私活企业所得税最新政策
  • 惠州网站建设找惠州邦手机版网站怎么做的
  • 企业网站哪家好上海品牌推广公司
  • wordpress禁止中国ip网站后台的seo功能
  • 中国建设教育协会培训中心网站微信平台与微网站开发
  • 顺德网站建设教程如何更改wordpress上的默认头像
  • 怎么建自己公司网站saas自助建站
  • 好看的企业门户网站公司网站内容建设
  • 网站百度抓取网站用哪些系统做的比较好用
  • 洪梅镇网站建设公司网站建设选信达互联
  • 上海网站改版哪家好网站logo 更换
  • 网站建设与网页设计难学吗做外贸上哪些网站
  • 赣州市开发区建设局网站广州工程承包总公司