NoHostAvailableException happens when a lot of concurrent Spark threads reading one table in cassandra

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

NoHostAvailableException happens when a lot of concurrent Spark threads reading one table in cassandra

苏放
Hello,

I am using Spark1.2 and Cassandra 2.0.12. And the table I am about to read is with 20 million rows

When I use 10 threads of spark to read from cassandra, then it works fine.
val sc = new SparkContext("local[10]", "tungsten", conf)
When I use 40 threads of spark to read from cassandra, then it crashed with following error.
val sc = new SparkContext("local[40]", "tungsten", conf)
Then I changed the concurrent_read parameter in cassandra.yaml from 32 to 320. 

In this case, it works when i start 40 threads of spark. Then I tried to increase threads to 80, then the exception happens again.

It seems like a configuration problem. But how can I get through it. 

Thanks

15/03/23 20:25:43 WARN TaskSetManager: Lost task 222.0 in stage 0.0 (TID 222, localhost): TaskKilled (killed intentionally)
15/03/23 20:25:43 WARN TaskSetManager: Lost task 240.0 in stage 0.0 (TID 240, localhost): TaskKilled (killed intentionally)
15/03/23 20:25:43 WARN TaskSetManager: Lost task 241.0 in stage 0.0 (TID 241, localhost): TaskKilled (killed intentionally)
15/03/23 20:25:43 WARN TaskSetManager: Lost task 228.0 in stage 0.0 (TID 228, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 157 in stage 0.0 failed 1 times, most recent failure: Lost task 157.0 in stage 0.0 (TID 157, l          ocalhost): java.io.IOException: Exception during execution of SELECT count(*) FROM "tungsten"."cudb_dn" WHERE token("entry_key") > ? AND token("entry_key") <= ?   ALL          OW FILTERING: All host(s) tried for query failed (tried: /169.254.100.4:9042 (com.datastax.driver.core.exceptions.DriverException: Timed out waiting for server respon          se), /169.254.100.3:9042 (com.datastax.driver.core.TransportException: [/169.254.100.3:9042] Connection has been closed))
        at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:433)
        at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$20.apply(CassandraRDD.scala:447)
        at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$20.apply(CassandraRDD.scala:447)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
        at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:869)
        at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:868)
        at org.apache.spark.SparkContext$$anonfun$30.apply(SparkContext.scala:1389)
        at org.apache.spark.SparkContext$$anonfun$30.apply(SparkContext.scala:1389)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /169.254.100.4:9042 (com.datastax.driver.core.exce          ptions.DriverException: Timed out waiting for server response), /169.254.100.3:9042 (com.datastax.driver.core.TransportException: [/169.254.100.3:9042] Connection has           been closed))
        at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
        at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
        at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205)
        at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
        at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
        at com.sun.proxy.$Proxy11.execute(Unknown Source)
        at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:424)
        ... 14 more
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /169.254.100.4:9042 (com.datastax.driver.core.exce          ptions.DriverException: Timed out waiting for server response), /169.254.100.3:9042 (com.datastax.driver.core.TransportException: [/169.254.100.3:9042] Connection has           been closed))
        at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
        at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:210)
        ... 3 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)