From d6653f1f3461a4e0c43fc307efe215405aab6f4c Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 24 Dec 2015 15:34:18 +0800 Subject: [PATCH] add support for get committed offset of the consumer group for kafka --- .../actor/cluster/KafkaStateActor.scala | 247 ++++++++++++++++-- app/kafka/manager/model/ActorModel.scala | 1 + .../manager/utils/zero82/ClientUtils.scala | 128 --------- build.sbt | 3 +- 4 files changed, 233 insertions(+), 146 deletions(-) delete mode 100644 app/kafka/manager/utils/zero82/ClientUtils.scala diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 89c1e160c..ab2d49f8d 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -6,13 +6,22 @@ package kafka.manager.actor.cluster import java.util.concurrent.TimeUnit +import java.util.Properties import akka.pattern._ +import akka.actor.{ActorRef, Cancellable, ActorPath} import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import grizzled.slf4j.Logging import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} import kafka.common.TopicAndPartition import kafka.consumer.SimpleConsumer +import kafka.admin._ +import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.utils.Utils import kafka.manager._ import kafka.manager.base.cluster.BaseClusterQueryCommandActor import kafka.manager.base.{LongRunningPoolActor, LongRunningPoolConfig} @@ -27,7 +36,8 @@ import org.apache.curator.framework.recipes.cache._ import org.joda.time.{DateTime, DateTimeZone} import scala.collection.mutable -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, Await} +import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} /** @@ -140,7 +150,7 @@ trait OffsetCache extends Logging { protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] - protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] + def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, Long] @@ -239,7 +249,22 @@ trait OffsetCache extends Logging { ConsumedTopicDescription(consumer, topic, numPartitions, optTopic, partitionOwners, partitionOffsets) } - def getConsumerList: ConsumerList + def getConsumerList(adminClient: AdminClient, isCloseClient: Boolean): ConsumerList + + def getConsumerListByKafka(adminClient: AdminClient): IndexedSeq[String] = { + var consumerGroupList: List[String] = List() + var groupOverviewList: List[GroupOverview] = adminClient.listAllConsumerGroupsFlattened() + groupOverviewList match { + case Nil => IndexedSeq.empty + case l: List[GroupOverview] => { + l.foreach { + x => consumerGroupList = x.groupId :: consumerGroupList + } + } + } + groupOverviewList = null + consumerGroupList.toIndexedSeq + } } case class OffsetCacheActive(curator: CuratorFramework, @@ -282,7 +307,7 @@ case class OffsetCacheActive(curator: CuratorFramework, protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic) - protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive) + def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive) def start(): Unit = { info("Starting consumers tree cache...") @@ -352,12 +377,19 @@ case class OffsetCacheActive(curator: CuratorFramework, topicDescriptions.map(ConsumerDescription(consumer, _)) }*/ - def getConsumerList: ConsumerList = { + def getConsumerList(adminClient: AdminClient, isCloseClient: Boolean): ConsumerList = { + var sumConsumerList = getConsumerListByKafka(adminClient) withConsumersTreeCache { cache => cache.getCurrentChildren(ZkUtils.ConsumersPath) }.fold { - ConsumerList(IndexedSeq.empty, clusterContext) + if (isCloseClient) { + adminClient.close() + } + ConsumerList(IndexedSeq.empty.++:(sumConsumerList), clusterContext) } { data: java.util.Map[String, ChildData] => + if (isCloseClient) { + adminClient.close() + } val filteredList: IndexedSeq[String] = data.asScala.filter{ case (consumer, childData) => if (clusterContext.config.filterConsumers) @@ -365,7 +397,7 @@ case class OffsetCacheActive(curator: CuratorFramework, childData.getStat.getNumChildren > 2 else true }.keySet.toIndexedSeq - ConsumerList(filteredList, clusterContext) + ConsumerList(filteredList.++:(sumConsumerList).toSet.toIndexedSeq, clusterContext) } } } @@ -410,7 +442,7 @@ case class OffsetCachePassive(curator: CuratorFramework, protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic) - protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive) + def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive) def start(): Unit = { info("Starting consumers path children cache...") @@ -469,14 +501,21 @@ case class OffsetCachePassive(curator: CuratorFramework, Try(Option(curator.getChildren.forPath(zkPath)).map(_.asScala.toSet)).toOption.flatten.getOrElse(Set.empty) } - def getConsumerList: ConsumerList = { + def getConsumerList(adminClient: AdminClient, isCloseClient: Boolean): ConsumerList = { + val sumConsumerList = getConsumerListByKafka(adminClient) withConsumersPathChildrenCache { cache => val currentData = cache.getCurrentData currentData }.fold { - ConsumerList(IndexedSeq.empty, clusterContext) + if (isCloseClient) { + adminClient.close() + } + ConsumerList(IndexedSeq.empty.++:(sumConsumerList), clusterContext) } { data: java.util.List[ChildData] => - ConsumerList(data.asScala.map(cd => cd.getPath.split("/").last).toIndexedSeq, clusterContext) + if (isCloseClient) { + adminClient.close() + } + ConsumerList(data.asScala.map(cd => cd.getPath.split("/").last).toIndexedSeq.++:(sumConsumerList).toSet.toIndexedSeq, clusterContext) } } } @@ -487,6 +526,8 @@ case class KafkaStateActorConfig(curator: CuratorFramework, partitionOffsetCacheTimeoutSecs: Int, simpleConsumerSocketTimeoutMillis: Int) class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCommandActor with LongRunningPoolActor { + private val consumerInfoByKafka: Duration = 10 seconds + protected implicit val clusterContext: ClusterContext = config.clusterContext protected implicit val cf: ClusterFeatures = clusterContext.clusterFeatures @@ -581,7 +622,19 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom } } } - + + private def createAdminClient(): AdminClient = { + val targetBrokers : IndexedSeq[BrokerIdentity] = getBrokers + var brokerListStr = "" + targetBrokers.foreach { + b => { + brokerListStr += "%s:%d,".format(b.host, b.port) + } + } + + AdminClient.createSimplePlaintext(brokerListStr) + } + private[this] val offsetCache: OffsetCache = { if(config.clusterContext.config.activeOffsetCacheEnabled) new OffsetCacheActive( @@ -603,6 +656,9 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom config.clusterContext.config.version)(longRunningExecutionContext, cf) } + private[this] var cancellable : Option[Cancellable] = None + private[this] var consumerByKafkaFuture: Future[Option[List[ConsumerDescription]]] = null + @scala.throws[Exception](classOf[Exception]) override def preStart() = { log.info(config.toString) @@ -626,6 +682,13 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom //the offset cache does not poll on its own so it can be started safely log.info("Starting offset cache...") offsetCache.start() + + cancellable = Some( + context.system.scheduler.schedule(0 seconds, + 10 seconds, + self, + KSForceUpdateConsumerByKafka)(context.system.dispatcher,self) + ) } @scala.throws[Exception](classOf[Exception]) @@ -764,7 +827,11 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom case KSGetConsumers => asyncPipeToSender { - offsetCache.getConsumerList + var client = createAdminClient() + val consumerList: ConsumerList = offsetCache.getConsumerList(client, false) + client.close() + client = null + consumerList } case KSGetTopicConfig(topic) => @@ -778,12 +845,45 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom case KSGetConsumerDescription(consumer) => asyncPipeToSender { - offsetCache.getConsumerDescription(consumer) + var currentCD: ConsumerDescription = offsetCache.getConsumerDescription(consumer) + + var topicConsumerMap = currentCD.topics + + Await.ready(consumerByKafkaFuture, consumerInfoByKafka).value.get match { + case Success(consumerDescriptionList) => + consumerDescriptionList.map { + consumerListByKafka => + consumerListByKafka.filter(_.consumer == consumer).map { + filter => { + topicConsumerMap = topicConsumerMap.++:(filter.topics) + } + } + } + case Failure(e) => + topicConsumerMap + } + + ConsumerDescription(consumer, topicConsumerMap) } case KSGetConsumedTopicDescription(consumer, topic) => asyncPipeToSender { - offsetCache.getConsumedTopicDescription(consumer, topic, true) + var consumedTD = offsetCache.getConsumedTopicDescription(consumer, topic, true) + + Await.ready(consumerByKafkaFuture, consumerInfoByKafka).value.get match { + case Success(consumerDescriptionList) => + consumerDescriptionList.map { + consumerListByKafka => + consumerListByKafka.filter(_.consumer == consumer).map { + filter => { + consumedTD = filter.topics.get(topic).getOrElse(consumedTD) + } + } + } + case Failure(e) => + } + + consumedTD } case KSGetAllTopicDescriptions(lastUpdateMillisOption) => @@ -804,10 +904,31 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom val lastUpdateMillis = lastUpdateMillisOption.getOrElse(0L) if (offsetCache.lastUpdateMillis > lastUpdateMillis) { asyncPipeToSender { + var client = createAdminClient() ConsumerDescriptions(offsetCache - .getConsumerList + .getConsumerList(client, true) .list - .map(c => offsetCache.getConsumerDescription(c)), offsetCache.lastUpdateMillis) + .flatMap(c => { + + var topicConsumerMap = offsetCache.getConsumerDescription(c).topics + + Await.ready(consumerByKafkaFuture, consumerInfoByKafka).value.get match { + case Success(consumerDescriptionList) => + consumerDescriptionList.map { + consumerListByKafka => + consumerListByKafka.filter(_.consumer == c).map { + filter => { + topicConsumerMap = topicConsumerMap.++:(filter.topics) + } + } + } + case Failure(e) => + topicConsumerMap + } + + Option(ConsumerDescription(c, topicConsumerMap)) + }), + offsetCache.lastUpdateMillis) } } @@ -823,6 +944,10 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom case KSGetReassignPartition => sender ! reassignPartitions + case KSForceUpdateConsumerByKafka => + consumerByKafkaFuture = null + consumerByKafkaFuture = getConsumerDescriptionByKafka() + case any: Any => log.warning("ksa : processQueryRequest : Received unknown message: {}", any.toString) } } @@ -894,5 +1019,93 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom Option(fn(topicsTreeCache)) } + private[this] def getConsumerDescriptionByKafka(): Future[Option[List[ConsumerDescription]]] = { + def createNewConsumer(group: String): KafkaConsumer[String, String] = { + val properties = new Properties() + val deserializer = (new StringDeserializer).getClass.getName + val targetBrokers : IndexedSeq[BrokerIdentity] = getBrokers + var brokerListStr = "" + targetBrokers.foreach { + b => { + brokerListStr += "%s:%d,".format(b.host, b.port) + } + } + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerListStr) + properties.put(ConsumerConfig.GROUP_ID_CONFIG, group) + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer) + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) + + new KafkaConsumer(properties) + } + + implicit val ec = longRunningExecutionContext + var consumerByKafkaFuture: Future[Option[List[ConsumerDescription]]] = Future { + var client = createAdminClient() + offsetCache.getConsumerListByKafka(client).toList match { + case Nil => { + client.close() + client = null + None + } + case l: List[String] => { + var descriptions: List[ConsumerDescription] = List() + + l.foreach { + groupId => { + var comsumedTopicDescriptions: Map[String, ConsumedTopicDescription] = Map() + + val consumerSummaries = client.describeConsumerGroup(groupId) + if (!consumerSummaries.isEmpty) { + val consumer = createNewConsumer(groupId) + + consumerSummaries.foreach { consumerSummary => + val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) + val partitionOffsets = topicPartitions.flatMap { topicPartition => + Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => + topicPartition -> offsetAndMetadata.offset + } + }.toMap + + descriptions = descriptions.:+(ConsumerDescription(groupId, + topicPartitions.map { + tp => (tp.topic, tp.partition) + }.groupBy(_._1).map { + case (topic, tpl) => { + (topic, + ConsumedTopicDescription(groupId, + topic, + tpl.size, + offsetCache.getTopicDescription(topic, false), + Option(tpl.map { + p => (p._2, s"${consumerSummary.clientId}_${consumerSummary.clientHost}") + }.toMap), + Option(tpl.map { + p => (p._2, partitionOffsets.get(TopicAndPartition(topic, p._2)).get) + }.toMap))) + } + })) + }//foreach + consumer.close() + }//isEmpty + }//groupId + }//foreach + if (!descriptions.isEmpty) { + client.close() + client = null + Some(descriptions) + } else { + client.close() + client = null + None + } + }//List + }//match + }//Future + consumerByKafkaFuture.recover { case t => None } + + consumerByKafkaFuture + } } diff --git a/app/kafka/manager/model/ActorModel.scala b/app/kafka/manager/model/ActorModel.scala index e24b68f6c..76fd606d3 100644 --- a/app/kafka/manager/model/ActorModel.scala +++ b/app/kafka/manager/model/ActorModel.scala @@ -150,6 +150,7 @@ object ActorModel { case class KMCommandResult(result: Try[Unit]) extends CommandResponse sealed trait KSRequest extends QueryRequest + case object KSForceUpdateConsumerByKafka extends KSRequest case object KSGetTopics extends KSRequest case object KSGetConsumers extends KSRequest case class KSGetTopicConfig(topic: String) extends KSRequest diff --git a/app/kafka/manager/utils/zero82/ClientUtils.scala b/app/kafka/manager/utils/zero82/ClientUtils.scala deleted file mode 100644 index d97fa6424..000000000 --- a/app/kafka/manager/utils/zero82/ClientUtils.scala +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.manager.utils.zero82 - -import java.io.IOException - -import grizzled.slf4j.Logging -import kafka.api.{ConsumerMetadataResponse, ConsumerMetadataRequest} -import kafka.cluster.Broker -import kafka.common.ErrorMapping -import kafka.network.BlockingChannel - -import scala.util.Random - -/** - * Borrowed from kafka 0.8.2.1 - * https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java - */ -object ClientUtils extends Logging { - - /** - * Creates a blocking channel to a random broker - */ - def channelToAnyBroker(allBrokers: Seq[Broker], socketTimeoutMs: Int = 3000) : BlockingChannel = { - var channel: BlockingChannel = null - var connected = false - while (!connected) { - Random.shuffle(allBrokers).find { broker => - trace("Connecting to broker %s:%d.".format(broker.host, broker.port)) - try { - channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) - channel.connect() - debug("Created channel to broker %s:%d.".format(channel.host, channel.port)) - true - } catch { - case e: Exception => - if (channel != null) channel.disconnect() - channel = null - info("Error while creating channel to %s:%d.".format(broker.host, broker.port)) - false - } - } - connected = if (channel == null) false else true - } - - channel - } - - /** - * Creates a blocking channel to the offset manager of the given group - */ - def channelToOffsetManager(group: String, allBrokers: Seq[Broker], socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = { - var queryChannel = channelToAnyBroker(allBrokers) - - var offsetManagerChannelOpt: Option[BlockingChannel] = None - - while (!offsetManagerChannelOpt.isDefined) { - - var coordinatorOpt: Option[Broker] = None - - while (!coordinatorOpt.isDefined) { - try { - if (!queryChannel.isConnected) - queryChannel = channelToAnyBroker(allBrokers) - debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) - queryChannel.send(ConsumerMetadataRequest(group)) - val response = queryChannel.receive() - val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) - debug("Consumer metadata response: " + consumerMetadataResponse.toString) - if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) - coordinatorOpt = consumerMetadataResponse.coordinatorOpt - else { - debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds." - .format(queryChannel.host, queryChannel.port, group, retryBackOffMs)) - Thread.sleep(retryBackOffMs) - } - } - catch { - case ioe: IOException => - info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port)) - queryChannel.disconnect() - } - } - - val coordinator = coordinatorOpt.get - if (coordinator.host == queryChannel.host && coordinator.port == queryChannel.port) { - offsetManagerChannelOpt = Some(queryChannel) - } else { - val connectString = "%s:%d".format(coordinator.host, coordinator.port) - var offsetManagerChannel: BlockingChannel = null - try { - debug("Connecting to offset manager %s.".format(connectString)) - offsetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port, - BlockingChannel.UseDefaultBufferSize, - BlockingChannel.UseDefaultBufferSize, - socketTimeoutMs) - offsetManagerChannel.connect() - offsetManagerChannelOpt = Some(offsetManagerChannel) - queryChannel.disconnect() - } - catch { - case ioe: IOException => // offsets manager may have moved - info("Error while connecting to %s.".format(connectString)) - if (offsetManagerChannel != null) offsetManagerChannel.disconnect() - Thread.sleep(retryBackOffMs) - offsetManagerChannelOpt = None // just in case someone decides to change shutdownChannel to not swallow exceptions - } - } - } - - offsetManagerChannelOpt.get - } - -} diff --git a/build.sbt b/build.sbt index 9fca14f6c..1fc083ddb 100644 --- a/build.sbt +++ b/build.sbt @@ -35,7 +35,8 @@ libraryDependencies ++= Seq( "org.slf4j" % "log4j-over-slf4j" % "1.7.12", "com.adrianhurt" %% "play-bootstrap3" % "0.4.5-P24", "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.apache.kafka" %% "kafka" % "0.8.2.2" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), + //"org.apache.kafka" %% "kafka" % "0.8.2.2" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), + "org.apache.kafka" %% "kafka" % "0.9.0.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), "org.scalatest" %% "scalatest" % "2.2.1" % "test", "org.apache.curator" % "curator-test" % "2.9.1" % "test", "org.mockito" % "mockito-core" % "1.10.19" % "test",