Kafka源码-网络层
开篇诗两句:
人生若只如初见,何事秋风悲画扇。
等闲变却故人心,却道故人心易变。 ——纳兰性德《木兰花·拟古决绝词柬友》
SocketServer
1.1 SocketServer启动
SocketServer启动之前主要涉及到3个配置:
listeners:broker配置的所有listener(例如:PLAINTEXT://10.42.80.22:9092)
control.plane.listener.name:控制请求使用的listener
listener.security.protocol.map:各个listener对应的安全协议
SocketServer启动过程,首先创建控制请求对应的Acceptor和Processor,这个创建时根据control.plane.listener.name创建的,如果这个参数没有配置,则不会创建对应的Acceptor和Processors,此时,控制请求会使用数据请求的Acceptor和Processor。
创建控制请求对应的线程模型之后,会创建数据模型的线程模型。数据请求的线程模型的创建也是根据listener创建的,从listeners中排除control.plane.listener.name之后,都是数据请求的listener。
def startup(startProcessingRequests: Boolean = true,
controlPlaneListener: Option[EndPoint] = config.controlPlaneListener,
dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = {
this.synchronized {
createControlPlaneAcceptorAndProcessor(controlPlaneListener)
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners)
if (startProcessingRequests) {
this.startProcessingRequests()
}
}
......
}
1.1.1 控制请求的线程模型和数据请求的线程模型的不同点:
控制请求的线程模型最多只有1个,如果没有配置control.plane.listener.name参数,则不会创建。数据请求的线程模型根据listener配置,可以有多个。
同时,控制请求的线程模型的Acceptor只有1个Processor线程对应,数据请求的线程模型中,Acceptor线程可以对应多个Processor线程,具体数量是通过num.network.threads配置的。
1.1.2 相关参数
控制请求:
// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics))
数据请求:
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)
1.2 Acceptor线程
创建Acceptor线程需要涉及两个配置参数:socket.send.buffer.bytes和socket.receive.buffer.bytes
Acceptor线程有两个重要的属性,nioSelector和serverChannel,这两个属性需要在初始化时进行创建。serverChannel就是ServerSocketChannel,创建的时候需要指定socket.receive.buffer.bytes
1.2.1 执行逻辑
Acceptor线程主要的工作是将serverChannel接受的连接请求,分发给Processor线程进行处理,具体逻辑如下:
def run(): Unit = {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
while (isRunning) {
try {
acceptNewConnections()
closeThrottledConnections()
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket, selector, and any throttled sockets.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
throttledSockets.foreach(throttledSocket => closeSocket(throttledSocket.socket))
throttledSockets.clear()
shutdownComplete()
}
}
private def acceptNewConnections(): Unit = {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
// Assign the channel to the next processor (using round-robin) to which the
// channel can be added without blocking. If newConnections queue is full on
// all processors, block until the last one is able to accept a connection.
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
processor = synchronized {
// adjust the index (if necessary) and retrieve the processor atomically for
// correct behaviour in case the number of processors is reduced dynamically
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
(1)首先在nioSelector上注册OP_ACCEPT事件
(2)循环执行,nioSelector选择准备好的key(这里只注册了一个),如果时请求连接事件,serverChannel接受请求,并将socketChannel提交给Processor线程处理
因为一个Acceptor线程是可能对应多个Processor线程的,分配的时候采用的是round-robin的方式,如果当前选择的Processor不能处理这个请求,便会重试,重试次数为Processor线程的数量。(如果所有的Processor线程都无法处理这个socketChannel,就会阻塞,直到最后一个Processer能够处理这个请求)
1.3 Processor线程
Processor线程有如下几个重要属性:
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
newConnections是一个阻塞队列,主要的作用的Acceptor线程为Processor线程分配链接时,会将SocketChannel放到对应的Processor的这个队列中。
inflightResponses:记录各个connect正在发送的response。
responseQueue也是一个阻塞队列,主要的作用是请求处理完成之后,会将response放到这个队列中,Processor线程会从这个队列中获取response,并注册Selector状态,将response发送出去。
每个Processor线程都有独立的selector来处理READ事件和WRITE事件:
private val selector = createSelector(
ChannelBuilders.serverChannelBuilder(
listenerName,
listenerName == config.interBrokerListenerName,
securityProtocol,
config,
credentialProvider.credentialCache,
credentialProvider.tokenCache,
time,
logContext,
() => apiVersionManager.apiVersionResponse(throttleTimeMs = 0)
)
)
1.3.1 执行逻辑
Processor线程的主要执行逻辑如下:
(1)configureNewConnections()方法:主要的工作是从newConnections中获取Acceptor线程分配的SocketChannel,在selector上注册监听OP_READ事件。(这个方法很简单不详细描述)
(2)processNewResponses()方法:主要的工作是从responseQueue中获取response,将response设置为SocketChannel的Send,并设置OP_WRITE事件,后面会详细描述。
(3)poll()方法:执行selector.poll方法,监听各个channel的状态来执行对应的读写事件
(4)processCompletedReceives()方法:将接收到的request添加到requestQueue中,共RequestHandler线程获取,并将channel设置成Mute状态,保证每个channel相同时间只有一个请求在处理
(5)processCompletedSends()方法:Send对象成功发送之后的处理逻辑,调用回调方法,解除Channel的Mute状态
(6)processDisconnected()方法:连接断开的处理逻辑,不详细描述
(7)closeExcessConnections()方法:关闭连接的处理逻辑,不详细描述
override def run(): Unit = {
startupComplete()
try {
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
closeExcessConnections()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
1.3.2 processNewResponses()方法
private def processNewResponses(): Unit = {
var currentResponse: RequestChannel.Response = null
while ({currentResponse = dequeueResponse(); currentResponse != null}) {
val channelId = currentResponse.request.context.connectionId
try {
currentResponse match {
case response: NoOpResponse =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
updateRequestMetrics(response)
trace(s"Socket server received empty response to send, registering for read: $response")
// Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
// it will be unmuted immediately. If the channel has been throttled, it will be unmuted only if the
// throttling delay has already passed by now.
handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
tryUnmuteChannel(channelId)
case response: SendResponse =>
sendResponse(response, response.responseSend)
case response: CloseConnectionResponse =>
updateRequestMetrics(response)
trace("Closing socket connection actively according to the response code.")
close(channelId)
case _: StartThrottlingResponse =>
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
case _: EndThrottlingResponse =>
// Try unmuting the channel. The channel will be unmuted only if the response has already been sent out to
// the client.
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
tryUnmuteChannel(channelId)
case _ =>
throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
}
} catch {
case e: Throwable =>
processChannelException(channelId, s"Exception while processing response for $channelId", e)
}
}
}
processNewResponses()方法主要的工作是从responseQueue中获取response,并根据具体的类型进行不同的处理:
(1)NoOpResponse:表示没有response发送到client,需要触发ChannelMuteEvent.RESPONSE_SENT事件来修改KafkaChannel的状态(KafkaChannel是SocketChannel的封装),之后尝试解除当前Channel的Mute状态,该操作只会在当前状态为ChannelMuteState.MUTED执行,并注册OP_READ事件。(状态转换后面介绍)
(2)SendResponse:表示需要发送response给client,调用sendResponse方法,该方法并不是真的将消息进行发送,而是在Selector上注册OP_WRITE事件。
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {
val connectionId = response.request.context.connectionId
trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response")
// `channel` can be None if the connection was closed remotely or if selector closed it for being idle for too long
if (channel(connectionId).isEmpty) {
warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId")
response.request.updateRequestMetrics(0L, response)
}
// Invoke send for closingChannel as well so that the send is failed and the channel closed properly and
// removed from the Selector after discarding any pending staged receives.
// `openOrClosingChannel` can be None if the selector closed the connection because it was idle for too long
if (openOrClosingChannel(connectionId).isDefined) {
selector.send(new NetworkSend(connectionId, responseSend))
inflightResponses += (connectionId -> response)
}
}
(3)CloseConnectionResponse:关闭链接。
(4)StartThrottlingResponse:设置限流状态。
(5)EndThrottlingResponse:结束限流状态,并且尝试解除Channel的静音状态。
1.3.3 poll()方法
private def poll(): Unit = {
val pollTimeout = if (newConnections.isEmpty) 300 else 0
try selector.poll(pollTimeout)
catch {
case e @ (_: IllegalStateException | _: IOException) =>
// The exception is not re-thrown and any completed sends/receives/connections/disconnections
// from this poll will be processed.
error(s"Processor $id poll failed", e)
}
}
该方法主要是调用selector.poll()方法来处理所有准备完成的Channel事件。
selector.poll()方法代卖比较长,这里没有贴出,该方法主要的工作总结如下:
(1)判断当前缓存池资源是否充足,如果足够,就尝试解除Channel的状态,(要求不能在explicitlyMutedChannels中)
(2)首先判断当前状态需不需要处理数据,判断条件有三个:
a)准备好的SelectionKey的数量大于0
b)需要立即处理的链接列表不为空
c)缓存中存在数据需要读(PLAINTAXT没有缓存数据,加密传输方式会有缓存)
(3)如果满足(1)中的条件,或获取所有需要处理的SelectionKey,然后调用pollSelectionKeys()方法进行数据读取或者数据发送。
对于需要从缓存中读取数据的key,不是通过Selector的selectedKeys方法获取的,而是在上一次的poll操作中,将缓存中存在数据的key保存在了keysWithBufferedRead列表中。
pollSelectionKeys()方法的主要处理逻辑是:
a)首先判断channel的数据是否准备好,(这里的判断是针对加密传入的判断,明文传输方式直接就是准备完成的状态),如果没有准备完成,会执行channel.prepare()。
b)尝试读取数据,读取数据时所使用的缓存是从创建Processor线程时传入缓存池中获取的,如果缓存池资源不足,会Mute当前Channel来缓解内存压力。读取的请求数据会保存到stagedReceives
c)判断当前channel如果存在缓存数据需要读取,添加到keysWithBufferedRead,方便下次poll操作时获取。
d)尝试写消息,将上一步processNewResponses()方法设置的Send对象发送出去,并将发送完成的Send对象保存到completedSends。
(4)关闭超时的连接
(5)从stagedReceives中获取请求的数据,放到completedReceives中,每个Channel每次只能处理一个请求,如果Channel是Mute状态,则跳过操作。
1.3.4 processCompletedReceives()方法
private def processCompletedReceives(): Unit = {
selector.completedReceives.forEach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = parseRequestHeader(receive.payload)
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,
() => time.nanoseconds()))
trace(s"Begin re-authentication: $channel")
else {
val nowNanos = time.nanoseconds()
if (channel.serverAuthenticationSessionExpired(nowNanos)) {
// be sure to decrease connection count and drop any in-flight responses
debug(s"Disconnecting expired channel: $channel : $header")
close(channel.id)
expiredConnectionsKilledCount.record(null, 1, 0)
} else {
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,
channel.principal, listenerName, securityProtocol,
channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
// KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
// and version. It is done here to avoid wiring things up to the api layer.
if (header.apiKey == ApiKeys.API_VERSIONS) {
val apiVersionsRequest = req.body[ApiVersionsRequest]
if (apiVersionsRequest.isValid) {
channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
apiVersionsRequest.data.clientSoftwareName,
apiVersionsRequest.data.clientSoftwareVersion))
}
}
requestChannel.sendRequest(req)
selector.mute(connectionId)
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
}
}
case None =>
// This should never happen since completed receives are processed immediately after `poll()`
throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
}
} catch {
// note that even though we got an exception, we can assume that receive.source is valid.
// Issues with constructing a valid receive object were handled earlier
case e: Throwable =>
processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
}
}
selector.clearCompletedReceives()
}
processCompletedReceives()方法主要的工作是从上一步中的completedReceives中获取请求数据,并发送到RequestChannel中
之后将Channel设置成Mute状态,并且添加到explicitlyMutedChannels,并且对当前的Channel设置ChannelMuteEvent.REQUEST_RECEIVED事件。
1.3.5 processCompletedSends()方法
private def processCompletedReceives(): Unit = {
selector.completedReceives.forEach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = parseRequestHeader(receive.payload)
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,
() => time.nanoseconds()))
trace(s"Begin re-authentication: $channel")
else {
val nowNanos = time.nanoseconds()
if (channel.serverAuthenticationSessionExpired(nowNanos)) {
// be sure to decrease connection count and drop any in-flight responses
debug(s"Disconnecting expired channel: $channel : $header")
close(channel.id)
expiredConnectionsKilledCount.record(null, 1, 0)
} else {
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,
channel.principal, listenerName, securityProtocol,
channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
// KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
// and version. It is done here to avoid wiring things up to the api layer.
if (header.apiKey == ApiKeys.API_VERSIONS) {
val apiVersionsRequest = req.body[ApiVersionsRequest]
if (apiVersionsRequest.isValid) {
channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
apiVersionsRequest.data.clientSoftwareName,
apiVersionsRequest.data.clientSoftwareVersion))
}
}
requestChannel.sendRequest(req)
selector.mute(connectionId)
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
}
}
case None =>
// This should never happen since completed receives are processed immediately after `poll()`
throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
}
} catch {
// note that even though we got an exception, we can assume that receive.source is valid.
// Issues with constructing a valid receive object were handled earlier
case e: Throwable =>
processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
}
}
selector.clearCompletedReceives()
}
首先从inflightResponses中移除当前已经成功发送的请求,然后调用回调方法。
对当前的Channel处理ChannelMuteEvent.RESPONSE_SENT事件,并且尝试解除Channel的Mute状态,并且从explicitlyMutedChannels中移除。