`

分布式日志

阅读更多

 

最近完成一个简单的日志管理系统,拿出来跟大家分享一下!


主要实现的功能:

1、支持动态修改配置

2、实现统一的配置管理

3、支持文件输出、habse输出、mongodb输出


基于以上三点功能,我们下面详细说明


1、支持动态修改配置

说道支持这个功能,有个同事认为没有这个必要,他的观点是log4j的配置不需要经常变动,不需要支持这样的功能;本人的观点是“配置可以进行统一管理、而且正式机跟测试机的log4j的配置肯定会有一些差异的”,因此这个功能是必须的。


下面说一下实现

通过log4j提供的PropertyConfigurator类可以非常轻松的实现这个功能

代码如下

 

 

public class Log4jConfigListener {

 private  String log4jPath;

 /**
 * @param log4jPath the log4jPath to set
 */
 public void setLog4jPath(String log4jPath) {
 this.log4jPath = log4jPath;
 }


 /**
 * 装载log4j配置文件
* 
 * @author mrh
 * @DATE 2011-5-28
 */
 public void load() {
 // String path="config/log4j.properties";
 System.out.println("log4j configfile path=" + log4j.toString());
 PropertyConfigurator.configureAndWatch(log4j.toString(), 1000);// 间隔特定时间,检测文件是否修改,自动重新读取配置
}
}

 

 Spring中的配置

 

<bean class="com.jl.net.log4j.config.Log4jConfigListener" init-method="load">
  <property name="log4jPath" value="WEB-INF/classes/log4j/" /> <!--通过zookeeper实现统一配置的dataId-->
 </bean>

 

2、统一配置管理

统一配置管理,是通过zookeeper实现的,配置形式与log4j.properties配置一致;

配置如下

 

log4j.rootLogger=INFO,console,HbaseAppender

#console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}\:%L - %m%n
#hbase
log4j.appender.HbaseAppender=com.jl.net.log4j.hbase.HbaseAppender
log4j.appender.HbaseAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.HbaseAppender.sysName=tuid
log4j.appender.HbaseAppender.serverIp=Y
log4j.appender.HbaseAppender.zookeeper_ip=10.1.18.100,10.1.18.103,10.1.18.102
log4j.appender.HbaseAppender.zookeeper_port=2181
log4j.appender.HbaseAppender.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss sss} %c.%t:%L %-5p %x  %m%n


Spring中的配置


<bean class="com.jl.net.log4j.config.Log4jConfigListener" init-method="load">
  <property name="log4jPath" value="cfg/jlcloud.uid.log4j" /> <!--通过zookeeper实现统一配置的dataId-->
 </bean>

3、支持文件、habse、mongodb的输出

log4j本身就支持文件输出,mongodb的输出需要借助log4mongo-Java这个项目,maven的坐标是

 

<dependency>
   <groupId>org.log4mongo</groupId>
   <artifactId>log4mongo-java</artifactId>
   <version>0.7.4</version>
   <optional>true</optional>
  </dependency>

 

 

下面介绍一下如何实现hbase的输出,通过继承log4j的AppenderSkeleton类,同时实现Runnable接口,来实现:

代码如下

 

package com.yck.worm.hbase;

import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;

import com.yck.worm.logging.WLogger;



/**
 * 将日志 输出到HBase里
 * @author mrh
 *
 */
public class HbaseAppender extends AppenderSkeleton implements Runnable {
	
	private static final WLogger LOGGER = WLogger.getLogger(HbaseAppender.class);
	
	private int batchSize = -1;
	private int period = 1000;
	private String hbLogName = "jl_logs";
	private String hbLogFamily = "logs";
	private Queue<LoggingEvent> loggingEvents;
	private ScheduledExecutorService executor;
	private ScheduledFuture<?> task;
	private Configuration conf;
	private HTableInterface htable;
	private HBaseAdmin admin;

	private HConnection connection;
	
	private String zookeeper;
	
	private String sysName;
	
	/**
	 * if serverIp = Y then out put server's IP to hbase 
	 */
	private String serverIp;

	/**
	 * log4j初始设置,启动日志处理计划任务
	 */
	@Override
	public void activateOptions() {
		try {
			super.activateOptions();
			// 创建一个计划任务,并自定义线程名
			executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HbaseAppender"));
			// 日志队列
			loggingEvents = new ConcurrentLinkedQueue<LoggingEvent>();
			// 启动计划任务,如果run函数有异常任务将中断!
			task = executor.scheduleWithFixedDelay(this, period, period,TimeUnit.MILLISECONDS);
			if ("Y".equalsIgnoreCase(this.serverIp)) {
				InetAddress addr = InetAddress.getLocalHost();
				this.serverIp = addr.getHostAddress().toString();//获得本机IP 
			}
			LOGGER.info("ActivateOptions ok!");
		} catch (Exception e) {
			LOGGER.error("Error during activateOptions: " , e);
		}
	}

	/**
	 * 初始HBASE
	 *
	 * @return
	 */
	private boolean initHbase() {
        try {
            if (conf == null) {
            	LOGGER.info("initHbase ... zookeeper address is " + this.zookeeper);
            	if (this.zookeeper == null || "".equals(this.zookeeper)) {
            		throw new IllegalArgumentException("the zookeeper address for jlog4j's hadoop initial can not be null!");
            	}
            	String ip = this.zookeeper.split(":")[0];
            	String port = this.zookeeper.split(":")[1];
                conf = HBaseConfiguration.create();
                conf.set(HConstants.ZOOKEEPER_QUORUM, ip);
                conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(port));
                
                this.admin  = new HBaseAdmin(conf);
                if (!this.admin.tableExists(Bytes.toBytes(hbLogName))) {
                	HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(hbLogName));
                	HColumnDescriptor columnDescriptor = new HColumnDescriptor(hbLogFamily.getBytes());
                	tableDescriptor.addFamily(columnDescriptor);
                	this.admin.createTable(tableDescriptor);
                }
                this.connection = HConnectionManager.createConnection(conf);
                this.htable = this.connection.getTable(hbLogName.getBytes());
            }
            LOGGER.error("Init Hbase success !");
            return true;
        } catch (Exception e) {
            task.cancel(false);
            executor.shutdown();
            LOGGER.error("Init Hbase fail !", e);
            return false;
        }
    }

	@Override
	public void run() {
		if (conf == null || htable == null) {
			initHbase();
		}
		try {
			if (this.batchSize < 0 && loggingEvents.size() > 0 ) {
				this.output();
			} else 
			 // 日志数据超出批量处理大小
			if (loggingEvents.size() > 0 && loggingEvents.size() > this.batchSize) {
				this.output();
			}
		} catch (Exception e) {
			LOGGER.error("HbaseAppender Error run ", e);
		}
	}
	
	/**
	 * 输出日志
	 * @throws IOException
	 */
	private void output() throws IOException {
		LoggingEvent event;
		List<Put> logs = new ArrayList<Put>();
		// 循环处理日志队列
		while ((event = loggingEvents.poll()) != null) {
			try {
				// 写日志内容
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
				Date date = new Date(event.getTimeStamp());
				// 创建日志并指定ROW KEY
				StringBuffer key = new StringBuffer("");
				if (this.sysName != null)
					key.append(this.sysName).append("#");
				key.append(event.getLoggerName()).append("#");
				key.append(event.getLevel()).append("#");
				key.append(sdf.format(date)).append("#");
				key.append(UUID.randomUUID().toString().replace("-", ""));
				Put log = new Put(key.toString().getBytes());
				if (this.sysName != null)
					log.add(hbLogFamily.getBytes(), "sysName".getBytes(), this.sysName.getBytes());
				if (this.serverIp != null)
					log.add(hbLogFamily.getBytes(), "serverIp".getBytes(), this.serverIp.getBytes());
				log.add(hbLogFamily.getBytes(), "LoggerName".getBytes(), event.getLoggerName().getBytes());
				log.add(hbLogFamily.getBytes(), "level".getBytes(), event.getLevel().toString().getBytes());
				log.add(hbLogFamily.getBytes(), "datetime".getBytes(), sdf.format(date).getBytes());
				log.add(hbLogFamily.getBytes(), "message".getBytes(), event.getMessage().toString().getBytes());
				logs.add(log);
			} catch (Exception e) {
				LOGGER.error("Error logging put ", e);
			}
		}
		// 批量写入HBASE
		if (logs.size() > 0)
			htable.put(logs);
	}

	/**
	 * 日志事件
	 *
	 * @param loggingEvent
	 */
	@Override
	protected void append(LoggingEvent loggingEvent) {
		try {
			populateEvent(loggingEvent);
			// 添加到日志队列
			loggingEvents.add(loggingEvent);
		} catch (Exception e) {
			LOGGER.error("Error populating event and adding to queue", e);
		}
	}

	/**
	 * 事件测试
	 *
	 * @param event
	 */
	protected void populateEvent(LoggingEvent event) {
		event.getThreadName();
		event.getRenderedMessage();
		event.getNDC();
		event.getMDCCopy();
		event.getThrowableStrRep();
		event.getLocationInformation();
	}

	@Override
	public void close() {
		try {
			if (this.task != null)
				this.task.cancel(false);
			if (this.executor != null)
				this.executor.shutdown();
			if (this.connection != null)
				this.connection.close();
			if (this.admin != null)
				this.admin.close();
			if (this.htable != null)
				this.htable.close();
		} catch (IOException e) {
			LOGGER.error("Error close ", e);
		}
	}

	@Override
	public boolean requiresLayout() {
		return true;
	}

	// 设置每一批日志处理数量
	public void setBatchSize(int batchSize) {
		this.batchSize = batchSize;
	}

	/**
	 * 设置计划任务执行间隔
	 *
	 * @param period
	 */
	public void setPeriod(int period) {
		this.period = period;
	}

	/**
	 * 设置日志存储HBASE表名
	 *
	 * @param hbLogName
	 */
	public void setHbLogName(String hbLogName) {
		this.hbLogName = hbLogName;
	}

	/**
	 * 日志表的列族名字
	 * 
	 * @param hbLogFamily
	 */
	public void setHbLogFamily(String hbLogFamily) {
		this.hbLogFamily = hbLogFamily;
	}

	/**
	 * @param zookeeper the zookeeper to set
	 */
	public void setZookeeper(String zookeeper) {
		this.zookeeper = zookeeper;
	}

	/**
	 * @param sysName the sysName to set
	 */
	public void setSysName(String sysName) {
		this.sysName = sysName;
	}

	/**
	 * @param serverIp the serverIp to set
	 */
	public void setServerIp(String serverIp) {
		this.serverIp = serverIp;
	}
}

 

 

收工,有些粗糙,将就吧!

 

附带源码,见附件

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics