`
ladymaidu
  • 浏览: 678763 次
文章分类
社区版块
存档分类
最新评论

Cassandra源代码分析:数据写入流程

 
阅读更多

org.apache.cassandra.thrift.CassandraServer类的add方法将接受客户端的请求,该函数定义如下:

函数内部实现上首先将kv信息封装成RowMutation对象,之后创建QueryPath对象(主要是对数据进行封转),最后调用doInsert方法执行插入动作,doInsert函数定义如下:

函数内部首先进行数据检查,调用StorageProxy.mutate(mutations, consistency_level);执行数据的插入操作。mute方法定义如下:

对于每个Mutation对象,如果是CounterMutation类型的Mutation的话,首先要确保一个replica的写入成功,之后在向另外的N-1个replicas写入;其他类型的Mutation的话,没有这个要求,做法是首先得到N个replicas节点,向这个N个节点发送命令。

这两种类型的Mutation是通过两个函数mutateCounter和performWrite分别生成的,这里我们仅仅来看一下performWrite的实现:首先得到复制策略,通过复制策略得到所有replica的endpoints,将任务交给代理WritePerformer.apply执行。代码如下:


同时需要注意的是在文件org.apache.cassandra.service.StorageProxy.java中有三个实现而来WritePerformer接口的类,WritePerformer接口定义如下:

也就是说最终完成数据写入任务的是WritePerformer的apply方法。StorageProxy的三个实现该接口的类型如下:

我们分别来看上面的几个实现,standardWritePerformer的实现方式比较简单,对于endpoints的集合,如果该节点还live,那么其发送写命令,如果该节点dead,那么这时执行hinted-handoff策略:

到此我们已经完成了数据从StorageProxy到各个replicas的转发工作,当然这里还存在一些问题,会在下面的继续:

1. 首先replicas收到命令之后的处理动作

2. cassandra中如何生成replicas,如何发现endpoints的拓扑结构,这就涉及到cassandra中nitch的实现

3. cassandra中如何实现DHT?

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics