博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HAProxy+Hive构建高可用数据挖掘集群
阅读量:6416 次
发布时间:2019-06-23

本文共 4856 字,大约阅读时间需要 16 分钟。

Hive是facebook开源的一个非常伟大的工具,可以将hadoop中的数据用sql方式进行查询,比自己写map/reduce程序要方便很多。但是在实际使用中发现hive其实不够稳定,极少数情况会出现端口不响应或者进程丢失的问题,所以考虑将hive做成负载均衡的方式。或者更严格的说,叫做失效备份,避免出现某台个别的服务器连接数过大造成的端口不响应或者服务器故障造成无法查询。
前提是有2-3台服务器作为hive server,一台放置HAProxy。
一、安装配置HAProxy
#wget http://haproxy.1wt.eu/download/1.4/src/haproxy-1.4.20.tar.gz
#tar zxf haproxy-1.4.20.tar.gz
#mv haproxy-1.4.20 /opt/modules/haproxy
#cd /opt/modules/haproxy
#make TARGET=linux26
这样就编译完成了。
然后来配置HAProxy为四层代理,仅作为端口转发器使用。
global
                daemon
                nbproc 1
                pidfile /var/run/haproxy.pid
                ulimit-n 65535
####################
defaults
                mode tcp                         #mode { tcp|http|health },tcp 表示4层,http表示7层(对我们没用),health仅作为健康检查使用
                retries 2                          #尝试2次失败则从集群摘除
                option redispatch              #如果失效则强制转换其他服务器
                option abortonclose           #连接数过大自动关闭
                maxconn 1024                  #最大连接数
                timeout connect 1d           #连接超时时间,重要,hive查询数据能返回结果的保证
                timeout client 1d               #同上
                timeout server 1d              #同上
                timeout check 2000           #健康检查时间
                log 127.0.0.1 local0 err #[err warning info debug]
################
listen    admin_stats                           #定义管理界面
                bind 0.0.0.0:1090               #管理界面访问IP和端口
                mode http                         #管理界面所使用的协议
                maxconn 10
       #最大连接数
                stats refresh 30s                #30秒自动刷新
                stats uri /                         #访问url
                stats realm Hive\ Haproxy    #验证窗口提示
                stats auth admin:123456     #401验证用户名密码
########hive1#################
listen hive
#hive后端定义
                bind 0.0.0.0:10000               #ha作为proxy所绑定的IP和端口
                mode tcp                            #以4层方式代理,重要
                balance leastconn                #调度算法 'leastconn' 最少连接数分配,或者 'roundrobin',轮询分配
                maxconn 1024                      #最大连接数
                server hive_215 192.168.1.49:10000 check inter 180000 rise 1 fall 2
#释义:server 主机代名(你自己能看懂就行),IP:端口 每180000毫秒检查一次。也就是三分钟。
#hive每有10000端口的请求就会创建一个log,设置短了,/tmp下面会有无数个log文件,删不完。
                server hive_216 192.168.1.50:10000 check inter 180000 rise 1 fall 2
#同上,另外一台服务器。
用haproxy -f haproxy.conf启动。
比较重要的其实是timeout的前三个选项的时间定义,因
为hive运算请求时间会比较长,通常都会超过一个tcp包正常的session响应时间。所以,hive需要一个长连接来等待数据返回,在TCP里面应该是TIME_WAIT我记得。如果设置太短了,SQL可以提交成功,但是你将不会获得任何返回值。因为被提交的session已经关闭了。所以普通的tcp代理转发并不适合hive的应用。这点在做定时任务的时候尤其重要,你可能会提交一个任务,但是不会获得任何返回,又查不出任何结果,就很头疼。作为hive的任务开发者,不了解tcp协议的工作情况,作为运维又不了解hive,所以如果这里出了错误,会很难发现。我写的1d是1天的意思,正常的话,当hive执行完毕,会自己发送关闭TCP的指令。这点倒是不用太担心累计连接数超过限定。
1d标志1天,类似的还有1h表示1小时,1s表示1秒,1ms表示1ms。
另外配合我之前写过的python daemon程序,稍加修改,用来监控hive的server进程,就基本可保证hive server集群的高可用了。
目前这个高可用方案在公司内部的phpHiveAdmin中已经可以使用了,查询和结果获取正常无误,thrift连接IP和端口只要指向到Haproxy的IP和端口就可以了。
二、hive服务器端监控程序python
只要检测到hive进程没有或者端口消失,均重新启动hive
#!/usr/bin/env python
import sys, os, time, atexit, string
from signal import SIGTERM
class Daemon:
  def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
    self.stdin = stdin
    self.stdout = stdout
    self.stderr = stderr
    self.pidfile = pidfile
  
  def _daemonize(self):
    try:
      pid = os.fork()
      if pid > 0:
        sys.exit(0)
    except OSError, e:
      sys.stderr.write('fork #1 failed: %d (%s)\n' % (e.errno, e.strerror))
      sys.exit(1)
  
    os.chdir("/")
    os.setsid()
    os.umask(0)
  
    try:
      pid = os.fork()
      if pid > 0:
        sys.exit(0)
    except OSError, e:
      sys.stderr.write('fork #2 failed: %d (%s)\n' % (e.errno, e.strerror))
      sys.exit(1)
  
    sys.stdout.flush()
    sys.stderr.flush()
    si = file(self.stdin, 'r')
    so = file(self.stdout, 'a+')
    se = file(self.stderr, 'a+', 0)
    os.dup2(si.fileno(), sys.stdin.fileno())
    os.dup2(so.fileno(), sys.stdout.fileno())
    os.dup2(se.fileno(), sys.stderr.fileno())
  
    atexit.register(self.delpid)
    pid = str(os.getpid())
    file(self.pidfile,'w+').write('%s\n' % pid)
  
  def delpid(self):
    os.remove(self.pidfile)
  def start(self):
    try:
      pf = file(self.pidfile,'r')
      pid = int(pf.read().strip())
      pf.close()
    except IOError:
      pid = None
  
    if pid:
      message = 'pidfile %s already exist. Daemon already running?\n'
      sys.stderr.write(message % self.pidfile)
      sys.exit(1)
    
    self._daemonize()
    self._run()
  def stop(self):
    try:
      pf = file(self.pidfile,'r')
      pid = int(pf.read().strip())
      pf.close()
    except IOError:
      pid = None
  
    if not pid:
      message = 'pidfile %s does not exist. Daemon not running?\n'
      sys.stderr.write(message % self.pidfile)
      return
    try:
      while 1:
        os.kill(pid, SIGTERM)
        time.sleep(0.1)
        os.system("kill -9 `ps -ef | grep java | grep hive | grep -v 'grep' | awk '{print $2}'`")
    except OSError, err:
      err = str(err)
      if err.find('No such process') > 0:
        if os.path.exists(self.pidfile):
          os.remove(self.pidfile)
      else:
        print str(err)
        sys.exit(1)
  def restart(self):
    self.stop()
    self.start()
  def _run(self):
    while True:
      datanode = os.popen('ps -fe | grep "java" | grep "hive" | grep -v "grep" | wc -l').read().strip()
      if datanode == '0':
        os.system('export HADOOP_HOME=/opt/modules/hadoop/hadoop-0.20.203.0; export JAVA_HOME=/usr/java/jdk1.6.0_21; /opt/modules/hive/hive-0.7.1/bin/hive --service hiveserver 10001 &')
      time.sleep(2)
if __name__ == '__main__':
  daemon = Daemon('/tmp/watch_process.pid')
  if len(sys.argv) == 2:
    if 'start' == sys.argv[1]:
      daemon.start()
    elif 'stop' == sys.argv[1]:
      daemon.stop()
    elif 'restart' == sys.argv[1]:
      daemon.restart()
    else:
      print 'Unknown command'
      sys.exit(2)
    sys.exit(0)
  else:
    print 'usage: %s start|stop|restart' % sys.argv[0]
    sys.exit(2)
给可执行权限,然后用./daemon.py start启动,每两秒检测一次进程和端口。
2012-06-01
修改监控代码

转载地址:http://rssra.baihongyu.com/

你可能感兴趣的文章
C#预处理器指令【转】
查看>>
adb上使用cp/mv命令的替代方法(failed on '***' - Cross-device link解决方法)
查看>>
C++标准库简介、与STL的关系。
查看>>
Spring Boot 3 Hibernate
查看>>
查询EBS请求日志的位置和名称
查看>>
大型机、小型机、x86服务器的区别
查看>>
JVM调优总结:调优方法
查看>>
J2EE十三个规范小结
查看>>
算法(第四版)C#题解——2.1
查看>>
网关支付、银联代扣通道、快捷支付、银行卡支付分别是怎么样进行支付的?...
查看>>
大数据开发实战:Stream SQL实时开发一
查看>>
C++返回引用的函数例程
查看>>
C语言可变参数,参数传递
查看>>
你若安好便是晴天_百度百科
查看>>
Linux iptables 开放Mysql端口允许远程访问
查看>>
Mathematica 汉化教程
查看>>
JQuery EasyUI 读取设置input
查看>>
详解Java解析XML的四种方法(转载)
查看>>
32位系统win2008+mssql2008 6G内存折腾纪实
查看>>
3dmax快捷键
查看>>