Hadoop学习总结

所需积分/C币:50 2016-06-08 20:34:05 1.92MB PDF
收藏 收藏
举报

Hadoop 学习总结
sfdfs. data.dir)/current/VERSION /blk <id 1> /blk cid /blk <id /blk <id 2>.meta /blk <id /blk <id 64>.meta /subdir/ /subdir1/ /subdir63/ 数据节点的 namespace=1232737062 storageD=Ds-1640411682-127.0.1.1-50010-1254997319480 CTime storageType=DATA_NODE blk<id>保存的是HDFS的数据块,其中保存了具体的二进时数据 bk_<d>.meta保存的是数据块的属性信总:发本信总,类型信总:和 checksum 当一个日录中的数据块到达一定数量的侯,则创建子文件夹来保仁数据块及数据块屑性信息 二、数据流( data flow 2.1、读文件的过程 客户i( client用 File system的open(函数打开文件 Distributed Filesystem用RPC调用元数据芒点,符到文件的数据块信息 对丁每个数据埃,元数据节点返冋保存数据块闩数据节点臼地址 Distributed File system返回 FSDatalInputstream给客户端,用米读取数据 客尸猎洞用 stream的read()浮数开始读取数据 DFSInputstrea连保存此文算一个数块的最近的数据有点 Data从数据节点议到客户端( client) 当此薮块煮取完芊时, DFSInputstream关闭和此惬节点的连接,然后连接此文件、一个数据决的最近的数据节点。 当客户端读取完毕数据的时候,调用 FSDatalnputstreεam的dose函数。 在读数据约过程中,如耒客户端在与数据乍点道信出现锴误,则尝试连接包含此效据虫的下一个数据节点 失欺的数据节点将板记录,以后不再连接 l:°pen Distributed 2: get block locat HDFS FileSystem k NameNode dient 3: rea FSData 6 close namenode InputStream dientⅣM dient nod 4. read 5: read DataNode Data Node Data Node datanode atanode datanode 22、写文件的过程 ·客户增用 create()来创建文件 Distributed| lesystem RPC调用元数点,仁文件系统的命名空问中创建一个新的文件 元效据节点首先确定文件原来不李在,并且客户端有创建文件的权限,然后创建新文件 Distributed File systen返回 DFSOutputstre 户端用于写数据 ·客户端大始写入数据, DFSOutputstrearm将数振分成奥,写λ data queue。 Data queue i Data Streamer读取,并通知元炇椐勹点分配澉椐节点,用来行储数据埃(每块默认复制3)。分的效握节点放在一个 pipeline里。 · Data streamer.*数据埞写入 pipeline中的第一饣溦据节忐。第一个据勹点粕数据块发送洽第一个欻振节点。篛二个效据节点将效据发送第三个数据 DFSOutput Stream为发出去的据保存了 ack queue,等待 pipeline中的数据点告知数据己经写入成功 如果数据节点在写入的过建中失败 ●关 pipeline,将 ack queue中的数据决放λ data queue的开始 当前的薮块在已经写入豹数据节点中被元薮据节点赋了新的标示,则锠误节点重启后能够察觉其数据块是过寸的,会涂 失敗的节点从 pipeline中移除,另外的数据块则写入 pipeline中的为外两个数节点 ●元薮据节点则被通知上数据决是复制玩数不足,来会再命建第三价备脊 当寳户编结束写入数据,则调用 stream的ccse函镦。此操仵所有的数块写入 pipeline中的数诟吉点,并等待 ack queue返回成功。最后通知亓数 据节点写入完毕。 2: create I create Distributed HDFS FileSystem 7: complete Name Node dient 3: write 6: dose FSData namenode OutputStream dient JVM dient node 4: wtite pad 5: ack Pipeline of DataNode Data Node DataNode datanodes datanode datanode datanode HDFS读写过程解析 文件的打开 11、客户端 HDFs打开个文件,需要左客户端调 Distributed Filesystem.pen( Path f, int buffersize),其实现为 (Path f, int e) throws IOException return new DFSClient DF SDataInputStream( dfs. open(getPathName(f), buffer Size, verify Checksum, statistics) 其中 dfs DistributedFilesystem的成员变量 DFSClient,其φpen函数被调用,其中创建一^ DFSInputstrear(src, buffersize, verify Checksum)并返巴。 在 DFSInputstream的造浮数中, openinfo函数被调用,其丰要从 namenode中得到要打开的文件所对应的 blocks的信鼠,现如下: synchronized void openInfo( throws IOException i Located Blocks newInfo callGetBlock Locations(namenode, src, 0, prefetchSize) this located blocks new Info this currentnode null private static Located Blocks callGetBlock Locations(ClientProtocol namenode, String src, long start long length) throws IOException i return namenode. get BlockLocations(src, start, length) ocated blocks主要包含个链表的Lst< Lacatedblock> blocks,其中每个 Located block含如下信息: Block b:此 block的信息 long offset: t block在文件十的偏移量 DatanodeInfo[]ocs:此bock位于哪些 Data Node上 上面 namenode . getblacklocations是个RPC调用,最终调用 Name node类的 getBlackLocations函数 1.2、 NameNode NameNode. getBlock Locations实现如下 public Located blocks getBlock Locations( String src ng length) throws IOException i return namesystem getBlockLocations(getClientMachineo Src, offset namesystem是 Name Node一个成员变量,其类坐为 FSNamesysten,保存的是 Namenode的 name space树:其中一个重要的成实变量为 FSDirectory FSDirectory和 Lucene中的 FSDirectory没有仨何关系,其主要包括 FSImage fsimage,用于读写候血上的 cimage文件, FSImage类有成贝变量 FSEditLog editLog,用于读写硬盘上的edt文件,这个文件的关系在上一篇文章中己红解释过 FSDirectory还有一个車要的成员变量 INodeDirectorywith Quota rootDir, INodeDirectory withA』ota的父类为 INode Directory:实现如下: public class INode directory extends INode 1 List <INode> child 出此可见 INodeDirectory本身是一个 INode,其中包含一个链表的 INode,此链表中,如果仍为文件夹,则是类 INodeDirectory,如果是文件,则是类犁 INodefile, I NodeFile中有成员变量 BlockInfo blocks[,是比文件包含的 block的信忘。显然这是一模树形的结构 FSNamesystem. getB| ocklocations函数如: boolean doAccessTime) throws IOException f final LocatedBlocks ret getBlockLocationsInternal(src dir getFileINode( src) offset, length, Integer. MAX VALUE, doAccessTime): return ret; dir. getFileINode(src)通过路径名从文件系统树中我到 INodefile,其中保存的是要打井的文件的 INode的信总 getBlockLocationsInterna的实现如下 private synchronized LocatedBlocks getBlockLocationsIntemal(String src, INodeFile inode long length, nt nrBlocksToReturn boolean doAccess Time //得到此文件的bock信息 de. getBlocks( List<Located Block> results new ArrayList<Located Block>(blocks. length) //计算从 offset开始,长度为 length所涉及的 blocks int curBlk =0 long curPos =0, blkSize =0; int nrBlocks =(blocks[O]. getNum Bytes==0)?0: blocks. length; for(curBlk =0: curBlk< nrBlocks; cur Blk++)f blkSize blocks[cur Blk]-getNum Bytes if (curPos blkSize >offset)t /当 offset在 curPos和 curPos+ blksize之间的时候, cur Blk指向 offset所在的bock curPos + blksize. //循环,依次遍历从 curLl开始的每个bock,直到当前位置 curPos越过 endor int num CorruptNodes countNodes(blocks[curB k)). corruptReplicas(: int numCorruptReplicas=corruptReplicas. numCorruptReplicas( blocks[curBlKD): boolean block corrupt=(num Corrupt Nodes = num Nodes) int numMachineSet= block corrupt num Nodes (numNodes-numCorruptNodes) //依次找到此 block所对应的 datanode,将其中没有损坏的放入 machineset中 DatanodeDescriptor[ machine Set= new Datanode descriptor[numMachineSet 0){ for(IteratorDatanode Descriptor> it blocks Map nodeIterator(blocks[curBIk]); it hasNext(; )i Datanode Descriptor dn =it. nexto boolean replica Corrupt= corruptReplicas isReplica Corrupt(blocks[curBlk], dn ) (block Corrupt I(!block Corrupt & !replicacorrupt)) machineset[numNodes++l= dn //使用此 machinesel和当前的bock构造一个 Located Block results. add (new located Block(blacks[cur Blkl, machineSet, curPos block Corrupt)) curPos + blocks[cur Blk]. getNumBytes( curBlk++ y while(curpos endorf & curBlk blocks length & results size (< nrBlacksToReturn); //使用此 Located block链表构造一个 Located blocks对象返回 return inode. createLocated Blocks(results); 13、客户端 通讨RPC调用,在 NameNode供到的 Located blocks对象,作为成员变量构造 DFSInputstream对系,最后包装为 FSDataInputstream返回给用户 二、文件的读取 2.1、客户端 文仁读取的时候:客户湍利用文件打开的时侯得到的 FSDataInputstream. read( ong position;byte] buffer, int offset,int| ength)函数进仨文件读操仵。 FSDataInputstream会调用其封装的 DFSInputstream的read( ong position,byte[] buffer, int offset, int length)函数,实现如下 public int read(long position, byte buffer, int offset int length) throws IOException i if ((position + length)> filelen)i realLen =(int (filelen-position //首先得到包含从 gth内容的bock列表 //比如对于64M一个 block的文件系统来说,欲读取从100M开始,长度为128M的数据,则bck列表包括第2,3,4块bock //对每一个 block,从中读取内容 //对于上而的例了,对于第2决 block,读取从36M开始,读取长度2BM,对于第3块,读取整块64M.对于第4块,读取从0开 始,长度为36M,共128M数据 for(Located Block blk blockRange)[ long targetStart position-blk. getStartoffseto) long bytesToRead= Math min(remaining, blk. getBlock Size(-target start) fetch BlockByte Range(blk, target start, targetStart t bytesToRead -1, buffer, offset) remaining -= bytesToRead offset + bytesToRead assert remaining ==0: Wrong number of bytes read stats. increment BytesRead(realLen) return reallen 其中 get BlockRange四数如卜 private synchronized List< LocatedBlock> getBlockRange(long offset throws IOException i //首先从缓存的 located Blocks中查找 offset所在的 block在级存链表中的位置 if (blockIdx <0)[// block is not cached blockIdx Located Blocks. getInsertIndex(blockIdx): long remaining = length; long carOff while(remaining >0)i adBlock blk null. /按照 blocked的位置找到 block if(blockIdx located Blocks located Block Counto) //如果bock为空,则缓存中没有此bock,则直接从 Namenode中查找这些 block,并加入缓存 if(blk = null l curoff blk, getstartoffseto Located ocks new blocks ions(namenade src, curoff, remaining lacated Blocks insertRange(blockIdx, new Blocks. getLocated blocks) //如果bock找到,则放入结果集 . add(blk long bytesRead= blk. getStartoffset(+ blk. get BlockSize -curoff; curOff += bytesRead //取下一个 其中 fetch Block ByteRange实现如下 private void fetchBlock ByteRange (Located Blockblock, long start long end, byte[ buf, int offset)throws IOException t Socket dn null: int numAttempts= block, getlocationso length //此 while循环为读取失败后的重试次数 //选择一个 Data Node来读取数据 DNAddrPair retval choose DataNode(block) DatanodeInfo chosen Node= retval info InetSocketAddress targetAddr retval addr //创建 Socket连接到 DataNode dn socketFactory create Socket dn. connect(targetAddr, socketTimeout) //利用建立的 Socket链接,生成一个 reader负责从 DataNode读取数据 reader= BlockReader newBlockReader(dn, sI block. get Block. getGenerationstampo tart, len buff verify Checksum, clientName) //汝取数据 int nread reader, readall(buf, offset, len); OUtils. closeSocket(dn) //如果读取失,则将此 Datanode标记为失败节点 add ToDeadNodes(chosen Node) BlockReader, new blockReader函数实塊如卜 public static BlockReader new BlockReader( socket sock String file long blockEd long startOffset, long let int buffer Size, boolean verify Checksum string clientName) /使用 Socket矬立写入流,向 DataNode发送读指令 tputstream out new dataoutputStream( new BufferedOutputstream(NetUtils getoutput stream sock, HdfsConstants WRITE_TIMEOUDDDi outwrite Short( Data TransferProtocol DATA_TRANSFER_VERSION aut. write( DataTransferProtocol OP READ_ BLOCK aut writelong( blockEd out writeLong( gen Stamp ) out writeLong( startOffset out writeLong( len ) Text. write out, clientName); out flushe //使用 Socke矬立读入流,用于从 DataNode读取数据 DataInputStream in new DataInputStream( new BufferedInputStream(NetUtils getInput stream(sock) bufferSize ))i Data Checksum checksum Datachecksum newData Checksum( in long firstchunkOffset= in readLongo //生成一个 reader,主要包含实入流,用于读取数据 return new Block Reader( file, blacked, in, checksum, verify Checksum, startOffset, firstChunk Offset, sock B| ockReader的 readA函数默是上4成的 DataInputstream读数据。 2.2、 Data Node 在 Datanode启动的时候,会调用函数 startDatanode,其十与数捷读取有关竹逻钳如下 yoid startDataNode( Configuration conf AbstractList< File> dataDirs throws IOException i //建立一个 Serversocket,并生成一个 Dataxceiver server来监控客户端的链接 ServerSocket Channel. open( socket(: new Server Socket ss.setReceive Buffer Size (DEFAULT_DATA_SOCKET_sIzE) ith the actual port selfAddr = new InetSocketAddress(ss. getInetAddress-getHostAddresso, tmpPort) this dn Registration setName(machineName + +tmpPort his thread Group new Thread Group( d this dataXceiverserver new Daemon(thread group, this)) this thread Group, setD

...展开详情
试读 68P Hadoop学习总结
立即下载 低至0.43元/次 身份认证VIP会员低至7折
抢沙发
一个资源只可评论一次,评论内容不能少于5个字
上传资源赚积分or赚钱
最新推荐
Hadoop学习总结 50积分/C币 立即下载
1/68
Hadoop学习总结第1页
Hadoop学习总结第2页
Hadoop学习总结第3页
Hadoop学习总结第4页
Hadoop学习总结第5页
Hadoop学习总结第6页
Hadoop学习总结第7页
Hadoop学习总结第8页
Hadoop学习总结第9页
Hadoop学习总结第10页
Hadoop学习总结第11页
Hadoop学习总结第12页
Hadoop学习总结第13页
Hadoop学习总结第14页

试读结束, 可继续读6页

50积分/C币 立即下载 >