博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark 源码分析 -- BlockStore
阅读量:6757 次
发布时间:2019-06-26

本文共 10359 字,大约阅读时间需要 34 分钟。

BlockStore

抽象接口类, 关键get和put都有两个版本

序列化, putBytes, getBytes
非序列化, putValues, getValues

其中putValues的返回值为PutResult, 其中的data可能是Iterator或ByteBuffer

private[spark] case class PutResult(size: Long, data: Either[Iterator[_], ByteBuffer])
 
/** * Abstract class to store blocks */private[spark]abstract class BlockStore(val blockManager: BlockManager) extends Logging {  def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)  /**   * Put in a block and, possibly, also return its content as either bytes or another Iterator.   * This is used to efficiently write the values to multiple locations (e.g. for replication).   *   * @return a PutResult that contains the size of the data, as well as the values put if   *         returnValues is true (if not, the result's data field can be null)   */  def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,     returnValues: Boolean) : PutResult  /**   * Return the size of a block in bytes.   */  def getSize(blockId: String): Long  def getBytes(blockId: String): Option[ByteBuffer]  def getValues(blockId: String): Option[Iterator[Any]]  /**   * Remove a block, if it exists.   * @param blockId the block to remove.   * @return True if the block was found and removed, False otherwise.   */  def remove(blockId: String): Boolean  def contains(blockId: String): Boolean  def clear() { }}

 

DiskStore

对应DiskStore其实很单纯, 就是打开相应的文件读或写.

/** * Stores BlockManager blocks on disk. */private class DiskStore(blockManager: BlockManager, rootDirs: String)  extends BlockStore(blockManager) with Logging {
override def putBytes(blockId: String, _bytes: ByteBuffer, level: StorageLevel) {    // So that we do not modify the input offsets !    // duplicate does not copy buffer, so inexpensive    val bytes = _bytes.duplicate()    val file = createFile(blockId)    val channel = new RandomAccessFile(file, "rw").getChannel()    while (bytes.remaining > 0) {      channel.write(bytes)    }    channel.close()  }
override def putValues(      blockId: String,      values: ArrayBuffer[Any],      level: StorageLevel,      returnValues: Boolean)    : PutResult = {    val file = createFile(blockId)    val fileOut = blockManager.wrapForCompression(blockId,      new FastBufferedOutputStream(new FileOutputStream(file)))    val objOut = blockManager.defaultSerializer.newInstance().serializeStream(fileOut)    objOut.writeAll(values.iterator)    objOut.close()    val length = file.length()    if (returnValues) {      // Return a byte buffer for the contents of the file      val buffer = getFileBytes(file)      PutResult(length, Right(buffer))    } else {      PutResult(length, null)    }  }  override def getBytes(blockId: String): Option[ByteBuffer] = {    val file = getFile(blockId)    val bytes = getFileBytes(file)    Some(bytes)  }  override def getValues(blockId: String): Option[Iterator[Any]] = {    getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))  }
}

 

MemoryStore

对于MemoryStore复杂一些

首先使用LinkedHashMap, 可遍历的HashMap, 来组织MemoryStore, 其中的hashmap的结构(blockid, entry)

使用Entry抽象来表示block内容
并且在put的时候, 还涉及到memory空间的释放, ensureFreeSpace

/** * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as * serialized ByteBuffers. */private class MemoryStore(blockManager: BlockManager, maxMemory: Long)  extends BlockStore(blockManager) {  // 使用Entry来表示block内容  case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)   private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) // 使用LinkedHashMap来表示整个MemoryStore  private var currentMemory = 0L  // Object used to ensure that only one thread is putting blocks and if necessary, dropping  // blocks from the memory store.  private val putLock = new Object() // HashMap不是线程安全的, 需要锁同步  override def putBytes(blockId: String, _bytes: ByteBuffer, level: StorageLevel) {    // Work on a duplicate - since the original input might be used elsewhere.    val bytes = _bytes.duplicate()    bytes.rewind()  // 对于NIO的ByteBuffer, 使用前最好rewind    if (level.deserialized) { // 如果storage level需要非序列化的      val values = blockManager.dataDeserialize(blockId, bytes) // 需要先反序列化      val elements = new ArrayBuffer[Any]      elements ++= values      val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])      tryToPut(blockId, elements, sizeEstimate, true)    } else {      tryToPut(blockId, bytes, bytes.limit, false)    }  }
// putValues的返回值取决于storage level, 如果是deserialized, 返回iterator, 否则ByteBuffer  override def putValues(      blockId: String,      values: ArrayBuffer[Any],      level: StorageLevel,      returnValues: Boolean)    : PutResult = {    if (level.deserialized) {      val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])      tryToPut(blockId, values, sizeEstimate, true)      PutResult(sizeEstimate, Left(values.iterator))    } else {      val bytes = blockManager.dataSerialize(blockId, values.iterator)      tryToPut(blockId, bytes, bytes.limit, false)      PutResult(bytes.limit(), Right(bytes.duplicate()))    }  }  override def getBytes(blockId: String): Option[ByteBuffer] = {    val entry = entries.synchronized {      entries.get(blockId)    }    if (entry == null) {      None    } else if (entry.deserialized) {      Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))    } else {      Some(entry.value.asInstanceOf[ByteBuffer].duplicate())   // Doesn't actually copy the data    }  }  override def getValues(blockId: String): Option[Iterator[Any]] = {    val entry = entries.synchronized {      entries.get(blockId)    }    if (entry == null) {      None    } else if (entry.deserialized) {      Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)    } else {      val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data      Some(blockManager.dataDeserialize(blockId, buffer))    }  }
 
/**   * Try to put in a set of values, if we can free up enough space. The value should either be   * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)   * size must also be passed by the caller.   *   * Locks on the object putLock to ensure that all the put requests and its associated block   * dropping is done by only on thread at a time. Otherwise while one thread is dropping   * blocks to free memory for one block, another thread may use up the freed space for   * another block.   */  private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {    // TODO: Its possible to optimize the locking by locking entries only when selecting blocks    // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been    // released, it must be ensured that those to-be-dropped blocks are not double counted for    // freeing up more space for another block that needs to be put. Only then the actually dropping    // of blocks (and writing to disk if necessary) can proceed in parallel.    putLock.synchronized {      if (ensureFreeSpace(blockId, size)) { // 如果可用分配足够的memory        val entry = new Entry(value, size, deserialized)        entries.synchronized { entries.put(blockId, entry) }        currentMemory += size        true      } else { // 如果memory无法放下这个block, 那么只有从memory删除, 如果可以用disk, 那么在dropFromMemory中会put到disk中         // Tell the block manager that we couldn't put it in memory so that it can drop it to        // disk if the block allows disk storage.        val data = if (deserialized) {          Left(value.asInstanceOf[ArrayBuffer[Any]])        } else {          Right(value.asInstanceOf[ByteBuffer].duplicate())        }        blockManager.dropFromMemory(blockId, data)        false      }    }  }  /**   * Tries to free up a given amount of space to store a particular block, but can fail and return   * false if either the block is bigger than our memory or it would require replacing another   * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that   * don't fit into memory that we want to avoid).   *   * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.   * Otherwise, the freed space may fill up before the caller puts in their new value.   */  private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {    if (space > maxMemory) {      logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")      return false    }    if (maxMemory - currentMemory < space) {      val rddToAdd = getRddId(blockIdToAdd)      val selectedBlocks = new ArrayBuffer[String]()      var selectedMemory = 0L      // This is synchronized to ensure that the set of entries is not changed      // (because of getValue or getBytes) while traversing the iterator, as that      // can lead to exceptions.      entries.synchronized {        val iterator = entries.entrySet().iterator()  // 会依次删除现有的block, 直到可以放下新的block        while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {          val pair = iterator.next()          val blockId = pair.getKey          if (rddToAdd != null && rddToAdd == getRddId(blockId)) {            logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +              "block from the same RDD")            return false          }          selectedBlocks += blockId          selectedMemory += pair.getValue.size        }      }      if (maxMemory - (currentMemory - selectedMemory) >= space) {        logInfo(selectedBlocks.size + " blocks selected for dropping")        for (blockId <- selectedBlocks) {  // 删除selectedBlocks, 释放空间          val entry = entries.synchronized { entries.get(blockId) }          // This should never be null as only one thread should be dropping          // blocks and removing entries. However the check is still here for          // future safety.          if (entry != null) {            val data = if (entry.deserialized) {              Left(entry.value.asInstanceOf[ArrayBuffer[Any]])            } else {              Right(entry.value.asInstanceOf[ByteBuffer].duplicate())            }            blockManager.dropFromMemory(blockId, data)          }        }        return true      } else {        return false      }    }    return true  }
 
本文章摘自博客园,原文发布日期:2014-01-09

转载地址:http://vhzeo.baihongyu.com/

你可能感兴趣的文章
iOS数据持久化的方式
查看>>
JQgrid for asp.net 不完全手记
查看>>
ASP.NET-FineUI开发实践-16(二)
查看>>
Visual Studio2012使用技巧
查看>>
编程思想
查看>>
在Hadoop伪分布式模式下安装Hive(derby,mysql)
查看>>
经典布局样式
查看>>
python小白之np功能快速查
查看>>
Authorization Bypass in RSA NetWitness
查看>>
把ISO文件当作光盘挂载
查看>>
Algs4-2.3.26子数组大小直方图
查看>>
C#下Emgucv的配置
查看>>
你未必了解的DNS
查看>>
pycharm的放大和缩小字体的显示 和ubunt的截圖工具使用 ubuntu上安装qq微信等工具...
查看>>
【Java基础】sun.misc.BASE64和Java 8 java.util.Base64区别
查看>>
响应式开发的思路和断点的选择
查看>>
使用PL/SQL连接Oracle时报错ORA-12541: TNS: 无监听程序
查看>>
Mac011--DbWrench Database安装
查看>>
[原]Flash研究(一)——本地通讯
查看>>
bootStrap table 和 JS 开发过程中遇到问题汇总
查看>>