Build spark cluster

Install pssh, to batch config spark slaves

1
2
3
4
5
6
7
wget https://storage.googleapis.com/google-code-archive-downloads/v2/code.google.coms/parallel-ssh/pssh-2.3.1.tar.gz

tar -xzvf pssh-2.3.1.tar.gz

cd pssh-2.3.1

python setup.py install

使用参见:

System config in master

/work/hosts

1
2
3
4
5
6
7
8
root@192.168.4.210
root@192.168.4.211
root@192.168.4.212
root@192.168.4.213
root@192.168.4.214
root@192.168.4.215
root@192.168.4.216
root@192.168.4.217

/etc/hosts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
127.0.0.1       localhost
127.0.1.1 ubuntu

# The following lines are desirable for IPv6 capable hosts
::1 localhost ip6-localhost ip6-loopback
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

192.168.4.211 n1
192.168.4.212 n2
192.168.4.213 n3
192.168.4.214 n4
192.168.4.215 n5
192.168.4.216 n6
192.168.4.217 n7

/etc/environment

1
2
3
4
5
6
7
8
9
10
LANGUAGE="zh_CN:zh:en_US:en"
LANG=zh_CN.GBK
SPARK_HOME=/work/spark
SCALA_HOME=/opt/scala
JAVA_HOME=/opt/java
J2SDKDIR=/opt/java
J2REDIR=/opt/java/jre
DERBY_HOME=/opt/java/db
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/opt/java/bin:/opt/scala/bin:/work/spark/bin"
CLASSPATH=..:/opt/java/lib:/opt/java/jre/lib:/opt/scala/lib:/work/spark/jars

/work/spark/conf/spark-env.sh,update ens3 to your network interface

1
2
3
4
5
6
7
8
export SPARK_LOCAL_IP=$(ifconfig ens3 | grep "inet addr:" | awk '{print $2}' | cut -c 6-)
export SPARK_MASTER_HOST=n1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8000
export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=/work/spark/logs-event"
#export SPARK_EXECUTOR_CORES=1
#export SPARK_EXECUTOR_MEMORY=512M
#export SPARK_WORKER_INSTANCES=4

update_hostname.sh, update default hostname ubuntu to n{machine ip last number}

1
2
3
4
5
6
7
8
9
#!/usr/bin/env bash
NODE_LOCAL_IP=$(ifconfig ens3 | grep "inet addr:" | awk '{print $2}' | cut -c 6-)
NEW_HOSTNAME="n${NODE_LOCAL_IP:12}"

#echo $NEW_HOSTNAME > /proc/sys/kernel/hostname
#echo $NEW_HOSTNAME > /etc/hostname

sed -i 's/127.0.1.1.*/127.0.1.1\t'"$NEW_HOSTNAME"'/g' /etc/hosts
hostnamectl set-hostname $NEW_HOSTNAME

Init master and slaves in master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
pssh -h hosts "apt update"
pssh -h hosts "apt install htop -y"

#zsh init
pssh -h hosts "apt install zsh -y"
pssh -h hosts 'sh -c "$(curl -fsSL https://raw.github.com/robbyrussell/oh-my-zsh/master/tools/install.sh)";'
pssh -h hosts chsh -s $(which zsh)

#update dns resolve
pscp -h hosts /etc/resolvconf/resolv.conf.d/base /etc/resolvconf/resolv.conf.d/base
pssh -h hosts "resolvconf -u"

#host update
pscp -h hosts /etc/hosts /etc

#create work directory
pssh -h hosts "mkdir /work"

#copy from master: 192.168.4.210
#plesae remove 192.168.4.210 in hosts

tar -zxvf jdk-8u172-linux-x64.tar.gz -C /opt
tar -zxvf scala-2.11.12.tgz -C /opt
tar -zxvf scala-2.11.12.tgz -C /opt
mv /opt/jdk-8u172-linux-x64 /opt/java
mv /opt/scala-2.11.12 /opt/scala

tar -zxvf spark-2.3.0-bin-hadoop2.7.tgz -C /work
mv spark-2.3.0-bin-hadoop2.7 spark

#copy java, scala
pscp -r -h hosts /opt/* /opt/
#copy spark
pscp -r -h hosts /work/spark /work

#update env variables
pscp -h hosts /etc/environment /etc
pssh -h hosts "source /etc/environment"

#update hostname
pscp -h hosts update_hostname.sh /tmp/
pssh -h hosts "./tmp/update_hostname.sh"

Hadoop config in master

此处引入hadoop是为了slaves从hadoop拉去资源启动app;

当然,也可以复制jar到各slaves相同的路径启动;

后面发现,每次重启app,spark都会从hadoop全量拉取一遍资源到spark/work目录。

core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/work/hadoop/tmp</value>
<description>Abase for other temporary directories.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://n0:9820</value>
</property>
</configuration>

hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/work/hadoop/tmp/dfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/work/hadoop/tmp/dfs/datanode</value>
</property>
<property>
<name>dfs.http.address</name>
<value>n0:9870</value>
</property>
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.permissions.superusergroup</name>
<value>hadoop</value>
</property>
<property>
<name>hadoop.http.staticuser.user</name>
<value>hadoop</value>
</property>
<property>
<name>dfs.secondary.http.address</name>
<value>n0:9868</value>
</property>
<property>
<name>dfs.datanode.http.address</name>
<value>n0:9864</value>
</property>
<property>
<name>dfs.datanode.address</name>
<value>n0:9866</value>
</property>
<property>
<name>dfs.datanode.hostname</name>
<value>n0</value>
</property>
</configuration>

init hadoop

1
2
3
4
5
6
7
8
9
#create hadoop user in master
sudo useradd -m hadoop -s /bin/bash

#/work/hadoop/bin
./hdfs dfs -mkdir -p /sparkHistoryLogs
./hdfs dfs -mkdir -p /eventLogs
./hdfs dfs -mkdir -p /spark

#./hdfs dfs -rm -R /spark/app/*

copy_app_resouces_to_hadoop.sh, run as hadoop user

1
2
3
4
5
6
7
8
9
10
#!/bin/bash
cd /work/hadoop/bin
#./hdfs dfs -mkdir -p /spark
./hdfs dfs -rm -R /spark/app/*
./hdfs dfs -copyFromLocal -f /work/spark/app/log4j.properties /spark/app
#spark config
./hdfs dfs -copyFromLocal -f /work/spark/app/default.conf /spark/app
#app dependencies
./hdfs dfs -copyFromLocal -f /work/spark/app/lib /spark/app
./hdfs dfs -copyFromLocal -f /work/spark/app/node-quality-streaming-0.0.1-SNAPSHOT.jar /spark/app

Exception solutions

  • fix spark WorkWebUI hostname(logs) 指向master机器hostname

    看源码,在spark-env.sh中指定SPARK_LOCAL_HOSTNAME并没起作用,

    解决方案:设置SPARK_PUBLIC_DNS参数后,worker webui中的跳转链接正常了。

    SPARK_PUBLIC_DNSpublicHostName我也是服了,

    如下图,原先stdout的链接的的主机为n0n0为master所在的机器:

    spark-worker-ui-hostname.png

    源码参考

    core/src/main/scala/org/apache/spark/ui/WebUI.scala

    1
    2
    3
    4
    5
    protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
    conf.get(DRIVER_HOST_ADDRESS))

    /** Return the url of web interface. Only valid after bind(). */
    def webUrl: String = s"http://$publicHostName:$boundPort"

    core/src/main/scala/org/apache/spark/util/Utils.scala

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private lazy val localIpAddress: InetAddress = findLocalInetAddress()
    private var customHostname: Option[String] = sys.env.get("SPARK_LOCAL_HOSTNAME")

    /**
    * Get the local machine's URI.
    */
    def localHostNameForURI(): String = {
    customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
    }
  • spark ConsumerRecord NotSerializableException bug

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = hi2, partition = 4, offset = 385, CreateTime = 1526369397516, checksum = 2122851237, serialized key size = -1, serialized value size = 45, key = null, value = {"date":1526369397516,"message":"0hh2KcCH4j"}))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 125)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    解决方案

    set SparkConf

    1
    2
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    sparkConf.set("spark.kryo.registrator", "me.codz.registrator.CunstomRegistrator");

    create CunstomRegistrator

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    package me.codz.registrator;

    import com.esotericsoftware.kryo.Kryo;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.spark.serializer.KryoRegistrator;

    public class CunstomRegistrator implements KryoRegistrator {

    @Override
    public void registerClasses(Kryo kryo) {
    kryo.register(ConsumerRecord.class);
    }
    }
  • spark TaskContext.get() cause NullPointerException

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    stream.foreachRDD((VoidFunction2<JavaRDD<ConsumerRecord<String, String>>, Time>) (v1, v2) -> {
    OffsetRange[] offsetRanges = ((HasOffsetRanges) v1.rdd()).offsetRanges();

    List<ConsumerRecord<String, String>> consumerRecordList = CollectionTools.emptyWrapper(v1.collect());
    consumerRecordList.forEach(consumerRecord -> {
    TaskContext taskContext = TaskContext.get();
    int partitionId = taskContext.partitionId();
    OffsetRange o = offsetRanges[partitionId];

    //...
    });

    });

    解决方案
    using foreachPartition

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    v1.foreachPartition(consumerRecordIterator -> {
    while (consumerRecordIterator.hasNext()) {
    ConsumerRecord<String, String> consumerRecord = consumerRecordIterator.next();

    //...
    }

    TaskContext taskContext = TaskContext.get();
    int partitionId = taskContext.partitionId();
    OffsetRange offsetRange = offsetRanges[partitionId];

    //...
    });
    });
  • Spark app connect kafka server by ip suspend

    1
    2
    2018-06-07 18:40:16 [ForkJoinPool-1-worker-5] INFO :: [Consumer clientId=consumer-1, groupId=node-quality-streaming] Discovered group coordinator lau.cc:9092 (id: 2147483647 rack: null)
    2018-06-07 18:40:18 [ForkJoinPool-1-worker-5] INFO :: [Consumer clientId=consumer-1, groupId=node-quality-streaming] Group coordinator lau.cc:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery

    解决方案

    vim kafka/config/server.properties

    1
    2
    #add follow line
    advertised.host.name=192.168.3.20

    kafka advertised.host.name DEPRECATED since 0.10.x, 0100 brokerconfigs

    1
    DEPRECATED: only used when `advertised.listeners` or `listeners` are not set. Use `advertised.listeners` instead. Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, it will use the value for `host.name` if configured. Otherwise it will use the value returned from java.net.InetAddress.getCanonicalHostName().

    so follow config can also take effect

    1
    2
    listeners=PLAINTEXT://192.168.3.20:9092
    advertised.listeners=PLAINTEXT://192.168.3.20:9092
  • mark

    1
    2
    3
    java.lang.OutOfMemoryError: unable to create new native thread

    Offsets out of range with no configured reset policy for partitions

Zookeeper cluster init

Download package from here

1
2
3
4
tar -zxvf zookeeper-3.4.12.tar.gz -C /work
mv zookeeper-3.4.12 zookeeper

cp conf/zoo_sample.cfg conf/zoo.cfg

vim conf/zoo.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/work/zookeeper/data
clientPort=2181

server.0=192.168.4.210:2888:3888
server.1=192.168.4.211:2888:3888
server.2=192.168.4.212:2888:3888
server.3=192.168.4.213:2888:3888
server.4=192.168.4.214:2888:3888
server.5=192.168.4.215:2888:3888
server.6=192.168.4.216:2888:3888
server.7=192.168.4.217:2888:3888

bin/init_myid.sh, update ens3 to your network interface

1
2
3
#!/bin/bash
NODE_LOCAL_IP=$(ifconfig ens3 | grep "inet addr:" | awk '{print $2}' | cut -c 6-)
echo ${NODE_LOCAL_IP##*.} > /work/zookeeper/data/myid

init cluster myid

1
pssh -h /work/hosts "/work/zookeeper/bin/init_myid.sh"

start zookeeper cluster

1
pssh -h /work/hhosts -o out "/work/zookeeper/bin/zkServer.sh start"

参考内容