flume - 组件对象初始化过程(2)

简介: 概述 虽然在上一篇博文《flume - 启动过程分析(1)》我们已经了解了flume相关组件的配置的加载以及启动过程,但却遗漏了组件初始化的过程,也就是说缺少了根据配置生成组件的过程,这篇文章就是为了弥补这个过程。

概述

 虽然在上一篇博文《flume - 启动过程分析(1)》我们已经了解了flume相关组件的配置的加载以及启动过程,但却遗漏了组件初始化的过程,也就是说缺少了根据配置生成组件的过程,这篇文章就是为了弥补这个过程。
 希望通过这篇文章我们能够了解,如何根据解析完的配置生成source、channel、sink这三个组件。
 后面会再通过一篇博文针对每个组件会通过举一个例子来说明组件的启动过程,这样组件的初始化和启动就讲解清楚了,当然本篇文章还是着重于讲清楚组件的初始化过程。

配置加载

  • 回顾下配置的加载过程,可以从源码的注释当中其实配置的解析过程主要分成两个步骤,step1过程主要用于解析properties的配置信息,step2针对step1解析的结果做二次解析,用于生成source、channel、sink特有的配置信息。
public FlumeConfiguration(Map<String, String> properties) {
    agentConfigMap = new HashMap<>();
    errors = new LinkedList<>();
    
    //step1 负责解析配置文件,生成source、channel、sink的配置信息。
    for (Entry<String, String> entry : properties.entrySet()) {
      if (!addRawProperty(entry.getKey(), entry.getValue())) {
        LOGGER.warn("Configuration property ignored: {} = {}", entry.getKey(), entry.getValue());
      }
    }
    
    //step2 根据上一步的配置进行根据source、channel、sink的属性进一步进行解析。
    validateConfiguration();
  }
  • 对比下flume的AgentConfiguration配置对象,源码中我把step1和step2过程初始化的变量都进行了注释了,这样子我想大家应该就够一目了然了吧。
public static class AgentConfiguration {

    //step1当中初始化的变量
    private final String agentName;
    private String configFilters;
    private String sources;
    private String sinks;
    private String channels;
    private String sinkgroups;

    //step2当中初始化的变量
    private final Map<String, ComponentConfiguration> sourceConfigMap;
    private final Map<String, ComponentConfiguration> sinkConfigMap;
    private final Map<String, ComponentConfiguration> channelConfigMap;
    private final Map<String, ComponentConfiguration> sinkgroupConfigMap;
    private final Map<String, ComponentConfiguration> configFilterConfigMap;

    //step1当中初始化的变量
    private Map<String, Context> configFilterContextMap;
    private Map<String, Context> sourceContextMap;
    private Map<String, Context> sinkContextMap;
    private Map<String, Context> channelContextMap;
    private Map<String, Context> sinkGroupContextMap;

    //step2当中初始化的变量
    private Set<String> sinkSet;
    private Set<String> configFilterSet;
    private Set<String> sourceSet;
    private Set<String> channelSet;
    private Set<String> sinkgroupSet;

    private final List<FlumeConfigurationError> errorList;
    private List<ConfigFilter> configFiltersInstances;
    private Map<String, Pattern> configFilterPatternCache;
  • 下面我们针对step2的过程(也就是validateConfiguration)进行下细分,因为里面涉及到后面我们组件初始化用到的变量。agentConfigMap中保存着agent对应的配置信息AgentConfiguration。遍历每个AgentConfiguration进行配置验证,也就是aconf.isValid()部分的逻辑,继续跟进该部分逻辑。
private void validateConfiguration() {
    Set<Entry<String, AgentConfiguration>> entries = agentConfigMap.entrySet();
    Iterator<Entry<String, AgentConfiguration>> it = entries.iterator();

    while (it.hasNext()) {
      Entry<String, AgentConfiguration> next = it.next();
      String agentName = next.getKey();
      AgentConfiguration aconf = next.getValue();

      //todo aconf是agent的配置文件,我们对整个配置文件进行校验
      if (!aconf.isValid()) {
        LOGGER.warn("Agent configuration invalid for agent '{}'. It will be removed.", agentName);
        addError(agentName, AGENT_CONFIGURATION_INVALID, ERROR);
        it.remove();
      }
      LOGGER.debug("Channels:{}\n", aconf.channels);
      LOGGER.debug("Sinks {}\n", aconf.sinks);
      LOGGER.debug("Sources {}\n", aconf.sources);
    }
  • aconf.isValid()部分的逻辑,我们可以看出来我们初始化了configFilterSet、channelSet、sourceSet、sinkSet、sinkgroupSet。然后这里证明了step2中初始化的变量。接着我们接着跟进validateChannels、validateSources、validateSinks这三个过程,之所以关注着三个过程我想大家都能理解,毕竟flume的核心组件无非就是channel、source、sink。
private boolean isValid() {
      LOGGER.debug("Starting validation of configuration for agent: {}", agentName);
      if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
        LOGGER.debug("Initial configuration: {}", getPrevalidationConfig());
      }

      configFilterSet = validateConfigFilterSet();
      createConfigFilters();
      runFiltersThroughConfigs();

      // Make sure that at least one channel is specified
      if (channels == null || channels.trim().isEmpty()) {
        LOGGER.warn(
            "Agent configuration for '{}' does not contain any channels. Marking it as invalid.",
            agentName
        );
        addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR);
        return false;
      }

        //todo 这里用于解析所有channel的名字,\\s+代表空格等分隔符
      channelSet = new HashSet<>(Arrays.asList(channels.split("\\s+")));

      //todo 核心在于验证里面的channel
      channelSet = validateChannels(channelSet);
      if (channelSet.isEmpty()) {
        LOGGER.warn(
            "Agent configuration for '{}' does not contain any valid channels. " +
                "Marking it as invalid.",
            agentName
        );
        addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR);
        return false;
      }

      //todo 核心的处理source】channel、sink的逻辑
      sourceSet = validateSources(channelSet);
      sinkSet = validateSinks(channelSet);
      sinkgroupSet = validateGroups(sinkSet);

      // If no sources or sinks are present, then this is invalid
      if (sourceSet.isEmpty() && sinkSet.isEmpty()) {
        LOGGER.warn(
            "Agent configuration for '{}' has no sources or sinks. Will be marked invalid.",
            agentName
        );
        addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, ERROR);
        addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, ERROR);
        return false;
      }

      // Now rewrite the sources/sinks/channels

      this.configFilters = getSpaceDelimitedList(configFilterSet);
      sources = getSpaceDelimitedList(sourceSet);
      channels = getSpaceDelimitedList(channelSet);
      sinks = getSpaceDelimitedList(sinkSet);
      sinkgroups = getSpaceDelimitedList(sinkgroupSet);

      if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
        LOGGER.debug("Post validation configuration for {}", agentName);
        LOGGER.debug(getPostvalidationConfig());
      }

      return true;
    }


  • 分析channel的校验过程,跟进validateChannels的过程,核心点在于将有配置信息的channel放置到channelConfigMap,把没有配置信息的channel放置到channelContextMap。ComponentConfigurationFactory.create根据channel的type进行创建,channel的type在下面的源码当中,create过程中对于指定类不存在情况我们虽然创建了ChannelConfiguration,但是属于isNotFoundConfigClass,然后会放置channelContextMap当中
private Set<String> validateChannels(Set<String> channelSet) {

      Iterator<String> iter = channelSet.iterator();
      Map<String, Context> newContextMap = new HashMap<>();
      ChannelConfiguration conf = null;
     
      //针对每个channel进行分析
      while (iter.hasNext()) {
        String channelName = iter.next();
        //todo channelContextMap保存了所有的配置信息
        Context channelContext = channelContextMap.get(channelName);
        // Context exists in map.
        if (channelContext != null) {
           //todo 正常情况这里取的type是file,所以后面走的是else分支
          ChannelType chType = getKnownChannel(channelContext.getString(
              BasicConfigurationConstants.CONFIG_TYPE));
          boolean configSpecified = false;
          String config = null;
          if (chType == null) {
            // 省略不重要的代码
          } else {
            config = chType.toString().toUpperCase(Locale.ENGLISH);
            configSpecified = true;
          }

          try {
            // 根据flume的ChannelType创建对应的配置文件ChannelConfiguration并根据channelContext进行初始化。
            conf =
                (ChannelConfiguration) ComponentConfigurationFactory.create(
                    channelName, config, ComponentType.CHANNEL);

            //根据原来的channelContext重新初始化conf对象。
            if (conf != null) {
              conf.configure(channelContext);
            }

            //没有相关配置信息的channel放在newContextMap当中。
            if ((configSpecified && conf.isNotFoundConfigClass()) ||
                !configSpecified) {
              newContextMap.put(channelName, channelContext);
            } else if (configSpecified) {
              //有配置信息的channel放在channelConfigMap当中
              channelConfigMap.put(channelName, conf);
            }
            if (conf != null) {
              errorList.addAll(conf.getErrors());
            }
          } 
        } 
      }

      //channelContextMap保存了没有配置信息的channel,channelConfigMap保存有配置信息的channel。
      channelContextMap = newContextMap;

      Set<String> tempchannelSet = new HashSet<String>();
      tempchannelSet.addAll(channelConfigMap.keySet());
      tempchannelSet.addAll(channelContextMap.keySet());
      channelSet.retainAll(tempchannelSet);
      return channelSet;
    }
  • 所有支持的channel类型,这里就不一一详细讲解了,后面会针对每种channel进行分析。
public enum ChannelType implements ComponentWithClassName {
  OTHER(null),
  FILE("org.apache.flume.channel.file.FileChannel"),
  MEMORY("org.apache.flume.channel.MemoryChannel"),
  JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
  SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel");


  • 分析source的校验过程,跟进validateSources的过程。核心点在于将有配置信息的source放置到sourceConfigMap,把没有配置信息的source放置到sourceContextMap。ComponentConfigurationFactory.create根据source的type进行创建,source的type在下面的源码当中。create过程中对于指定类不存在情况我们虽然创建了SourceConfiguration,但是属于isNotFoundConfigClass,然后会放置sourceContextMap当中
private Set<String> validateSources(Set<String> channelSet) {
      //Arrays.split() call will throw NPE if the sources string is empty
      if (sources == null || sources.isEmpty()) {
        LOGGER.warn("Agent configuration for '{}' has no sources.", agentName);
        addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, WARNING);
        return new HashSet<String>();
      }

      //todo 空格进行分割的sources
      Set<String> sourceSet =
          new HashSet<String>(Arrays.asList(sources.split("\\s+")));
      Map<String, Context> newContextMap = new HashMap<String, Context>();
      Iterator<String> iter = sourceSet.iterator();
      SourceConfiguration srcConf = null;
      
      //todo 遍历每个sources进行配置的解析
      while (iter.hasNext()) {
        String sourceName = iter.next();
        Context srcContext = sourceContextMap.get(sourceName);
        String config = null;
        boolean configSpecified = false;
        if (srcContext != null) {

            //todo 获取sources的type
          SourceType srcType = getKnownSource(srcContext.getString(
              BasicConfigurationConstants.CONFIG_TYPE));
          if (srcType == null) {
            config = srcContext.getString(
                CONFIG_CONFIG);
            if (config == null || config.isEmpty()) {
              config = "OTHER";
            } else {
              configSpecified = true;
            }
          } else {
            config = srcType.toString().toUpperCase(Locale.ENGLISH);
            configSpecified = true;
          }

          //todo 创建新的sources的配置信息
          try {
            // Possible reason the configuration can fail here:
            // Old component is configured directly using Context
            srcConf =
                (SourceConfiguration) ComponentConfigurationFactory.create(
                    sourceName, config, ComponentType.SOURCE);

            //todo 用旧的配置来初始化新的srcConf配置
            if (srcConf != null) {
              srcConf.configure(srcContext);
              Set<String> channels = new HashSet<String>();
              if (srcConf.getChannels() != null) {
                channels.addAll(srcConf.getChannels());
              }
              channels.retainAll(channelSet);
              if (channels.isEmpty()) {
                throw new ConfigurationException(
                    "No Channels configured for " + sourceName);
              }
              srcContext.put(CONFIG_CHANNELS,
                  this.getSpaceDelimitedList(channels));
            }
            if ((configSpecified && srcConf.isNotFoundConfigClass()) ||
                !configSpecified) {
              newContextMap.put(sourceName, srcContext);
            } else if (configSpecified) {
                //todo 把最新的配置放置到sourceConfigMap当中
              sourceConfigMap.put(sourceName, srcConf);
            }
            if (srcConf != null) errorList.addAll(srcConf.getErrors());
          } catch (ConfigurationException e) {
            if (srcConf != null) errorList.addAll(srcConf.getErrors());
            iter.remove();
            LOGGER.warn(
                "Could not configure source  {} due to: {}",
                new Object[]{sourceName, e.getMessage(), e}
            );
          }
        } else {
          iter.remove();
          addError(sourceName, CONFIG_ERROR, ERROR);
          LOGGER.warn("Configuration empty for: {}.Removed.", sourceName);
        }
      }

      // validateComponent(sourceSet, sourceConfigMap, CLASS_SOURCE, ATTR_TYPE,
      // ATTR_CHANNELS);
      sourceContextMap = newContextMap;
      Set<String> tempsourceSet = new HashSet<String>();
      tempsourceSet.addAll(sourceContextMap.keySet());
      tempsourceSet.addAll(sourceConfigMap.keySet());
      sourceSet.retainAll(tempsourceSet);
      return sourceSet;
    }
  • flume支持的source类型,可以大概看看,后面抽几个核心的分析一下。
public enum SourceType implements ComponentWithClassName {
  OTHER(null),
  SEQ("org.apache.flume.source.SequenceGeneratorSource"),
  NETCAT("org.apache.flume.source.NetcatSource"),
  EXEC("org.apache.flume.source.ExecSource"),
  AVRO("org.apache.flume.source.AvroSource"),
  SYSLOGTCP("org.apache.flume.source.SyslogTcpSource"),
  MULTIPORT_SYSLOGTCP("org.apache.flume.source.MultiportSyslogTCPSource"),
  SYSLOGUDP("org.apache.flume.source.SyslogUDPSource"),
  SPOOLDIR("org.apache.flume.source.SpoolDirectorySource"),
  HTTP("org.apache.flume.source.http.HTTPSource"),
  THRIFT("org.apache.flume.source.ThriftSource"),
  JMS("org.apache.flume.source.jms.JMSSource"),
  TAILDIR("org.apache.flume.source.taildir.TaildirSource"),
  NETCATUDP("org.apache.flume.source.NetcatUdpSource")


  • 分析sink的校验过程,跟进validateSinks的过程。核心点在于将有配置信息的sink放置到sinkConfigMap,把没有配置信息的sink放置到sinkContextMap。然后额外多提一点就是ComponentConfigurationFactory.create根据sink的type进行创建,sink的type在下面的源码当中。create过程中对于指定类不存在情况我们虽然创建了SinkConfiguration,但是属于isNotFoundConfigClass,然后会放置sinkContextMap当中
private Set<String> validateSinks(Set<String> channelSet) {
      // Preconditions.checkArgument(channelSet != null && channelSet.size() >
      // 0);
      Map<String, Context> newContextMap = new HashMap<String, Context>();
      Set<String> sinkSet;
      SinkConfiguration sinkConf = null;
      if (sinks == null || sinks.isEmpty()) {
        LOGGER.warn("Agent configuration for '{}' has no sinks.", agentName);
        addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, WARNING);
        return new HashSet<String>();
      } else {
        sinkSet =
            new HashSet<String>(Arrays.asList(sinks.split("\\s+")));
      }
      Iterator<String> iter = sinkSet.iterator();
      
      while (iter.hasNext()) {
          //todo 这里在遍历所有sink的名字
        String sinkName = iter.next();
        Context sinkContext = sinkContextMap.get(sinkName.trim());
        if (sinkContext == null) {
          iter.remove();
          LOGGER.warn("no context for sink{}", sinkName);
          addError(sinkName, CONFIG_ERROR, ERROR);
        } else {
          String config = null;
          boolean configSpecified = false;
          SinkType sinkType = getKnownSink(sinkContext.getString(
              BasicConfigurationConstants.CONFIG_TYPE));
          if (sinkType == null) {
            config = sinkContext.getString(
                CONFIG_CONFIG);
            if (config == null || config.isEmpty()) {
              config = "OTHER";
            } else {
              configSpecified = true;
            }
          } else {
            config = sinkType.toString().toUpperCase(Locale.ENGLISH);
            configSpecified = true;
          }
          try {
            LOGGER.debug("Creating sink: {} using {}", sinkName, config);

            //todo 创建SinkConfigration对象
            sinkConf =
                (SinkConfiguration) ComponentConfigurationFactory.create(
                    sinkName, config, ComponentType.SINK);
            if (sinkConf != null) {
                //todo 初始化sink配置
              sinkConf.configure(sinkContext);

            }

            if (!channelSet.contains(sinkConf.getChannel())) {
              throw new ConfigurationException("Channel " +
                  sinkConf.getChannel() + " not in active set.");
            }
            if ((configSpecified && sinkConf.isNotFoundConfigClass()) ||
                !configSpecified) {
              newContextMap.put(sinkName, sinkContext);
            } else if (configSpecified) {
                //todo sinkConfigMap保存了有配置的sink的配置
              sinkConfigMap.put(sinkName, sinkConf);
            }
            if (sinkConf != null) errorList.addAll(sinkConf.getErrors());
          } catch (ConfigurationException e) {
            iter.remove();
            if (sinkConf != null) errorList.addAll(sinkConf.getErrors());
            LOGGER.warn(
                "Could not configure sink  {} due to: {}",
                new Object[]{sinkName, e.getMessage(), e}
            );
          }
        }
        // Filter out any sinks that have invalid channel

      }

      //todo 重置了sinkContextMap对象
      sinkContextMap = newContextMap;
      Set<String> tempSinkset = new HashSet<String>();
      tempSinkset.addAll(sinkConfigMap.keySet());
      tempSinkset.addAll(sinkContextMap.keySet());
      sinkSet.retainAll(tempSinkset);

      return sinkSet;
    }
  • flume支持的sink类型,可以大概看看,后面抽几个核心的分析一下。
public enum SinkType implements ComponentWithClassName {
  OTHER(null),
  NULL("org.apache.flume.sink.NullSink"),
  LOGGER("org.apache.flume.sink.LoggerSink"),
  FILE_ROLL("org.apache.flume.sink.RollingFileSink"),
  HDFS("org.apache.flume.sink.hdfs.HDFSEventSink"),
  IRC("org.apache.flume.sink.irc.IRCSink"),
  AVRO("org.apache.flume.sink.AvroSink"),
  THRIFT("org.apache.flume.sink.ThriftSink"),
  ELASTICSEARCH("org.apache.flume.sink.elasticsearch.ElasticSearchSink"),
  HBASE("org.apache.flume.sink.hbase.HBaseSink"),
  ASYNCHBASE("org.apache.flume.sink.hbase.AsyncHBaseSink"),
  MORPHLINE_SOLR("org.apache.flume.sink.solr.morphline.MorphlineSolrSink"),
  HIVE("org.apache.flume.sink.hive.HiveSink"),
  HTTP("org.apache.flume.sink.http.HttpSink");


  • 需要对配置ComponentConfigurationFactory.create进行重点讲解,否则有可能绕不出创建对象的逻辑,这里的type我们传进去完整的class路径类型,所以这里先以完整的类去进行加载类,加载失败走Exception分支判断type类型进行创建。
public class ComponentConfigurationFactory {

  @SuppressWarnings("unchecked")
  public static ComponentConfiguration create(String name, String type, ComponentType component)
      throws ConfigurationException {
    Class<? extends ComponentConfiguration> confType = null;

    if (type == null) {
      throw new ConfigurationException(
          "Cannot create component without knowing its type!");
    }
    try {
        //todo type如果是指定类且加载成功就用这个类,如果类不存在或者指定的是类型,那么就走的Exception分支。
      confType = (Class<? extends ComponentConfiguration>) Class.forName(type);
      return confType.getConstructor(String.class).newInstance(type);
    } catch (Exception ignored) {
      try {
          //todo 我们正常配置的type=File之类的走的是这个分支
          type = type.toUpperCase(Locale.ENGLISH);
        switch (component) {
          case SOURCE:
            return SourceConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
                .getConfiguration(name);
          case CONFIG_FILTER:
            return ConfigFilterConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
                .getConfiguration(name);
          case SINK:
            return SinkConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
                .getConfiguration(name);
          case CHANNEL:
            return ChannelConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
                .getConfiguration(name);
          case SINK_PROCESSOR:
            return SinkProcessorConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
                .getConfiguration(name);
          case CHANNELSELECTOR:
            return ChannelSelectorConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
                .getConfiguration(name);
          case SINKGROUP:
            return new SinkGroupConfiguration(name);
          default:
            throw new ConfigurationException(
                "Cannot create configuration. Unknown Type specified: " + type);
        }
      } catch (ConfigurationException e) {
        throw e;
      } catch (Exception e) {
        throw new ConfigurationException("Could not create configuration! " +
            " Due to " + e.getClass().getSimpleName() + ": " + e.getMessage(),
            e);
      }
    }
  }
}


  • 针对上面提到的Exception分支的getConfiguration方法进行分析,这里也是比较核心的,我们以SourceConfigurationType的getConfiguration为例进行分析,其他的几个逻辑也是类似的。从看到的源码当中我们srcConfigurationName配置的类非常可能存在找不到的情况,那么就可以走Exception分支,然后就创建了NotFoundConfigClass类型的SourceConfiguration对象,其他的channel、sink也存在类似的情况
public SourceConfiguration getConfiguration(String name)
        throws ConfigurationException {
      if (this == OTHER) {
        return new SourceConfiguration(name);
      }
      Class<? extends SourceConfiguration> clazz = null;
      SourceConfiguration instance = null;
      try {
        if (srcConfigurationName != null) {
          clazz =
              (Class<? extends SourceConfiguration>) Class
                  .forName(srcConfigurationName);
          instance = clazz.getConstructor(String.class).newInstance(name);
        } else {
          // Could not find the configuration stub, do basic validation
          instance = new SourceConfiguration(name);
          // Let the caller know that this was created because of this exception.
          instance.setNotFoundConfigClass();
        }
      } catch (ClassNotFoundException e) {
        //todo 因为上面的类都没有找到,所以应该走的这个分支,创建了SourceConfiguration对象并设置setNotFoundConfigClass
        // Could not find the configuration stub, do basic validation
        instance = new SourceConfiguration(name);
        // Let the caller know that this was created because of this exception.
        instance.setNotFoundConfigClass();
      } catch (Exception e) {
        throw new ConfigurationException("Error creating configuration", e);
      }
      return instance;
    }


flume对象生成流程

  • 我们通过loadChannels、loadSources、loadSinks等方法生成对象,然后通过addChannel、addSourceRunner、addSourceRunner添加到conf当中,最后根据conf启动所有服务,这里我们着重分析3个对象的load过程。
public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        //todo 加载channel对象
        loadChannels(agentConf, channelComponentMap);

        //todo 加载source对象
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);

        //todo 加载sink对象
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);

        Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
        for (String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.get(channelName);
          if (channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap =
                channelCache.get(channelComponent.channel.getClass());
            if (nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }


channel对象初始化过程

  • 整个逻辑我们可以看出来主要分为3步,分别是根据channelConfigMap初始化channel,根据channelContextMap初始化channel,移除多余的channel(这部分是为了兼容动态flume配置变更设计的),我们分析一下一个Channel的创建过程。
private void loadChannels(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap)
          throws InstantiationException {
    LOGGER.info("Creating channels");

    //todo channelsNotReused记录旧的channel对象
    ListMultimap<Class<? extends Channel>, String> channelsNotReused =
        ArrayListMultimap.create();
    // assume all channels will not be re-used
    for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry :
         channelCache.entrySet()) {
      Class<? extends Channel> channelKlass = entry.getKey();
      Set<String> channelNames = entry.getValue().keySet();
      channelsNotReused.get(channelKlass).addAll(channelNames);
    }

    //todo 获取所有channel的名字
    Set<String> channelNames = agentConf.getChannelSet();
    //todo channelConfigMap获取配置,获取有配置信息的channel并进行初始化
    Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
    /*
     * Components which have a ComponentConfiguration object
     */
    for (String chName : channelNames) {
        //todo 从compMap中找对象,getChannelSet,获取没有配置信息的channel并进行初始化
      ComponentConfiguration comp = compMap.get(chName);
      if (comp != null) {
          //todo 会把需要重用的channel从channelsNotReused移除表明已经重用了,同时新增新的channel。
        Channel channel = getOrCreateChannel(channelsNotReused,
            comp.getComponentName(), comp.getType());
        try {
            //todo 核心地方在于这里负责向channel加载配置,相当于初始化channel对象
            Configurables.configure(channel, comp);
          //todo channelComponentMap保存着所有的channel
          channelComponentMap.put(comp.getComponentName(),
              new ChannelComponent(channel));
          LOGGER.info("Created channel " + chName);
        } catch (Exception e) {
          String msg = String.format("Channel %s has been removed due to an " +
              "error during configuration", chName);
          LOGGER.error(msg, e);
        }
      }
    }
   
    //todo 负责把所有的channel执行以下配置,这里应该包括旧的channel
    for (String chName : channelNames) {
        //todo 从ChannelContext中找对象,channelContextMap获取配置
      Context context = agentConf.getChannelContext().get(chName);
      if (context != null) {
        Channel channel = getOrCreateChannel(channelsNotReused, chName,
            context.getString(BasicConfigurationConstants.CONFIG_TYPE));
        try {
            //todo 核心地方在于这里负责向channel加载配置,相当于初始化channel对象
          Configurables.configure(channel, context);
          channelComponentMap.put(chName, new ChannelComponent(channel));
          LOGGER.info("Created channel " + chName);
        } catch (Exception e) {
          String msg = String.format("Channel %s has been removed due to an " +
              "error during configuration", chName);
          LOGGER.error(msg, e);
        }
      }
    }
   
    //todo 移除不需要的配置信息
    for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
      Map<String, Channel> channelMap = channelCache.get(channelKlass);
      if (channelMap != null) {
        for (String channelName : channelsNotReused.get(channelKlass)) {
          if (channelMap.remove(channelName) != null) {
            LOGGER.info("Removed {} of type {}", channelName, channelKlass);
          }
        }
        if (channelMap.isEmpty()) {
          channelCache.remove(channelKlass);
        }
      }
    }
  }


  • 单个channel的创建过程,分为两步走包括创建channel 和 配置channel,对应的函数是getOrCreateChannel和configure两者。
  • 通过channelFactory.getClass(type)获取对应的channelClass,然后通过channelFactory.create(name, type)创建对象。
private Channel getOrCreateChannel(
      ListMultimap<Class<? extends Channel>, String> channelsNotReused,
      String name, String type)
      throws FlumeException {

    Class<? extends Channel> channelClass = channelFactory.getClass(type);
    /*
     * Channel has requested a new instance on each re-configuration
     */
    if (channelClass.isAnnotationPresent(Disposable.class)) {
      Channel channel = channelFactory.create(name, type);
      channel.setName(name);
      return channel;
    }

    //todo channelCache是以channel的class作为key,value为map(key为channel的name,value为channel的实例)
    Map<String, Channel> channelMap = channelCache.get(channelClass);
    if (channelMap == null) {
      channelMap = new HashMap<String, Channel>();
      channelCache.put(channelClass, channelMap);
    }
    //todo name代表的是channel的名字
    Channel channel = channelMap.get(name);
    if (channel == null) {
        //todo 创建channel对象
      channel = channelFactory.create(name, type);
      channel.setName(name);
      channelMap.put(name, channel);
    }

    //todo 从channelsNotReused移除旧的channel对象
    channelsNotReused.get(channelClass).remove(name);
    return channel;
  }
  • channelFactory.getClass主要是从ChannelType中获取type对应的class,然后通过class.newInstance()方法创建对象。
@Override
  public Channel create(String name, String type) throws FlumeException {
    Preconditions.checkNotNull(name, "name");
    Preconditions.checkNotNull(type, "type");
    logger.info("Creating instance of channel {} type {}", name, type);
    Class<? extends Channel> channelClass = getClass(type);
    try {
      return channelClass.newInstance();
    } catch (Exception ex) {
      throw new FlumeException("Unable to create channel: " + name
          + ", type: " + type + ", class: " + channelClass.getName(), ex);
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  public Class<? extends Channel> getClass(String type) throws FlumeException {
    String channelClassName = type;
    ChannelType channelType = ChannelType.OTHER;
    try {
      channelType = ChannelType.valueOf(type.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException ex) {
      logger.debug("Channel type {} is a custom type", type);
    }
    if (!channelType.equals(ChannelType.OTHER)) {
      channelClassName = channelType.getChannelClassName();
    }
    try {
      return (Class<? extends Channel>) Class.forName(channelClassName);
    } catch (Exception ex) {
      throw new FlumeException("Unable to load channel type: " + type
          + ", class: " + channelClassName, ex);
    }
  }
  • ChannelType的类型的定义如下
public enum ChannelType implements ComponentWithClassName {
  OTHER(null),
  FILE("org.apache.flume.channel.file.FileChannel"),
  MEMORY("org.apache.flume.channel.MemoryChannel"),
  JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
}
  • Configurables.configure的主要配置channel,进一步通过指定的channel的configure方法实现数据的配置,通过传入不同的参数来初始化配置。
---------------------*****step1********--------------------------------------
public static boolean configure(Object target, Context context) {
    if (target instanceof Configurable) {
      ((Configurable) target).configure(context);
      return true;
    }

    return false;
  }

  public static boolean configure(Object target, ComponentConfiguration conf) {
    if (target instanceof ConfigurableComponent) {
      ((ConfigurableComponent) target).configure(conf);
      return true;
    }
    return false;
  }

  ---------------------*****step2********--------------------------------------
  @Override
  public void configure(Context context) {
    provider = JdbcChannelProviderFactory.getProvider(context, getName());

    LOG.info("JDBC Channel initialized: " + getName());
  }

  ---------------------*****step3********--------------------------------------
  public static synchronized JdbcChannelProvider getProvider(
      Context context, String name) {
    if (PROVIDER == null) {
      PROVIDER = new JdbcChannelProviderImpl();
      PROVIDER.initialize(context);
    }

    if (!INSTANCES.add(name)) {
      throw new JdbcChannelException("Attempt to initialize multiple "
           + "channels with same name: " + name);
    }

    return PROVIDER;
  }

---------------------*****step4********--------------------------------------
public void initialize(Context context) {
    LOGGER.debug("Initializing JDBC Channel provider");

    initializeSystemProperties(context);
    initializeDataSource(context);
    initializeSchema(context);
    initializeChannelState(context);
  }

  private void initializeSystemProperties(Context context) {}
  private void initializeChannelState(Context context) {}
  private void initializeSchema(Context context) {}
  private void initializeDataSource(Context context) {}


sources对象初始化过程

  • source的创建过程channel很类似,也经过sourceFactory.create创建source对象和Configurables.configure配置source对象,唯一多的步骤就是source需要和channel进行关联,由于比较相似细节就不继续跟进了。
private void loadSources(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap,
      Map<String, SourceRunner> sourceRunnerMap)
      throws InstantiationException {

    Set<String> sourceNames = agentConf.getSourceSet();
    Map<String, ComponentConfiguration> compMap =
        agentConf.getSourceConfigMap();
    /*
     * Components which have a ComponentConfiguration object
     */
    for (String sourceName : sourceNames) {
      ComponentConfiguration comp = compMap.get(sourceName);
      if (comp != null) {
        SourceConfiguration config = (SourceConfiguration) comp;

        //todo 创建一个source对象
        Source source = sourceFactory.create(comp.getComponentName(),
            comp.getType());

        try {
            //todo 配置sources
          Configurables.configure(source, config);

          //todo 通过source的config来获取对应的channel的信息,也就是说连接信息
          Set<String> channelNames = config.getChannels();
          List<Channel> sourceChannels = new ArrayList<Channel>();
          for (String chName : channelNames) {
            ChannelComponent channelComponent = channelComponentMap.get(chName);
            if (channelComponent != null) {
              sourceChannels.add(channelComponent.channel);
            }
          }

          //todo source没有连接任何的channel
          if (sourceChannels.isEmpty()) {
            String msg = String.format("Source %s is not connected to a " +
                "channel",  sourceName);
            throw new IllegalStateException(msg);
          }

          //todo sources的配置当中包含ChannelSelectorConfiguration
          ChannelSelectorConfiguration selectorConfig =
              config.getSelectorConfiguration();

          //todo 创建ChannelSelector对象,默认是复制的selector
          ChannelSelector selector = ChannelSelectorFactory.create(
              sourceChannels, selectorConfig);

          //todo 其中包括了selector以及拦截器对象,并通过config设置channelProcessor
          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
          Configurables.configure(channelProcessor, config);

          //todo 关联channelProcessor到source当中
          source.setChannelProcessor(channelProcessor);

          //todo 内部创建了runner
          sourceRunnerMap.put(comp.getComponentName(),
              SourceRunner.forSource(source));

          //todo 反关联channel到source的连接
          for (Channel channel : sourceChannels) {
            ChannelComponent channelComponent =
                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
                                           String.format("Channel %s", channel.getName()));
            channelComponent.components.add(sourceName);
          }
        } catch (Exception e) {
          String msg = String.format("Source %s has been removed due to an " +
              "error during configuration", sourceName);
          LOGGER.error(msg, e);
        }
      }
    }
    /*
     * Components which DO NOT have a ComponentConfiguration object
     * and use only Context
     */
    Map<String, Context> sourceContexts = agentConf.getSourceContext();
    for (String sourceName : sourceNames) {
      Context context = sourceContexts.get(sourceName);
      if (context != null) {
        Source source =
            sourceFactory.create(sourceName,
                                 context.getString(BasicConfigurationConstants.CONFIG_TYPE));
        try {
          Configurables.configure(source, context);
          List<Channel> sourceChannels = new ArrayList<Channel>();
          String[] channelNames = context.getString(
              BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
          for (String chName : channelNames) {
            ChannelComponent channelComponent = channelComponentMap.get(chName);
            if (channelComponent != null) {
              sourceChannels.add(channelComponent.channel);
            }
          }
          if (sourceChannels.isEmpty()) {
            String msg = String.format("Source %s is not connected to a " +
                "channel",  sourceName);
            throw new IllegalStateException(msg);
          }
          Map<String, String> selectorConfig = context.getSubProperties(
              BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);

          ChannelSelector selector = ChannelSelectorFactory.create(
              sourceChannels, selectorConfig);

          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
          Configurables.configure(channelProcessor, context);
          source.setChannelProcessor(channelProcessor);
          sourceRunnerMap.put(sourceName,
              SourceRunner.forSource(source));
          for (Channel channel : sourceChannels) {
            ChannelComponent channelComponent =
                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
                                           String.format("Channel %s", channel.getName()));
            channelComponent.components.add(sourceName);
          }
        } catch (Exception e) {
          String msg = String.format("Source %s has been removed due to an " +
              "error during configuration", sourceName);
          LOGGER.error(msg, e);
        }
      }
    }
  }


sink对象初始化过程

  • sink的创建过程channel很类似,也经过sinkFactory.create创建sink对象和Configurables.configure配置sink对象,唯一多的步骤就是sink需要和channel进行关联,由于比较相似细节就不继续跟进了。
private void loadSinks(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
      throws InstantiationException {
      //todo 获取sink的集合
    Set<String> sinkNames = agentConf.getSinkSet();
    Map<String, ComponentConfiguration> compMap =
        agentConf.getSinkConfigMap();

    Map<String, Sink> sinks = new HashMap<String, Sink>();
    /*
     * Components which have a ComponentConfiguration object
     */
    for (String sinkName : sinkNames) {
        //todo 根据sink的配置创建sink对象
      ComponentConfiguration comp = compMap.get(sinkName);
      if (comp != null) {
        SinkConfiguration config = (SinkConfiguration) comp;
        Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
        try {
            //todo 配置sink对象
          Configurables.configure(sink, config);

          //todo 找到sink关联的channel
          ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
          if (channelComponent == null) {
            String msg = String.format("Sink %s is not connected to a " +
                "channel",  sinkName);
            throw new IllegalStateException(msg);
          }

          //todo sink对象设置channel对象
          sink.setChannel(channelComponent.channel);
          sinks.put(comp.getComponentName(), sink);
          channelComponent.components.add(sinkName);
        } catch (Exception e) {
          String msg = String.format("Sink %s has been removed due to an " +
              "error during configuration", sinkName);
          LOGGER.error(msg, e);
        }
      }
    }
    /*
     * Components which DO NOT have a ComponentConfiguration object
     * and use only Context
     * 处理没有配置信息的sink对象
     */
    Map<String, Context> sinkContexts = agentConf.getSinkContext();
    for (String sinkName : sinkNames) {
      Context context = sinkContexts.get(sinkName);
      if (context != null) {
        Sink sink = sinkFactory.create(sinkName, context.getString(
            BasicConfigurationConstants.CONFIG_TYPE));
        try {
          Configurables.configure(sink, context);
          ChannelComponent channelComponent =
              channelComponentMap.get(
                  context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
          if (channelComponent == null) {
            String msg = String.format("Sink %s is not connected to a " +
                "channel",  sinkName);
            throw new IllegalStateException(msg);
          }
          sink.setChannel(channelComponent.channel);
          sinks.put(sinkName, sink);
          channelComponent.components.add(sinkName);
        } catch (Exception e) {
          String msg = String.format("Sink %s has been removed due to an " +
              "error during configuration", sinkName);
          LOGGER.error(msg, e);
        }
      }
    }

    loadSinkGroups(agentConf, sinks, sinkRunnerMap);
  }
目录
相关文章
|
2月前
bigdata-12-Flume核心组件
bigdata-12-Flume核心组件
37 0
|
2月前
|
存储 数据采集 JSON
bigdata-14-Flume高级组件
bigdata-14-Flume高级组件
29 1
|
1月前
|
存储 消息中间件 Kafka
【Flume】Flume 核心组件分析
【4月更文挑战第4天】【Flume】Flume 核心组件分析
|
7月前
|
数据采集 消息中间件 监控
大数据组件-Flume集群环境搭建
大数据组件-Flume集群环境搭建
119 0
|
7月前
|
Oracle 大数据 关系型数据库
大数据组件-Flume集群环境的启动与验证
大数据组件-Flume集群环境的启动与验证
95 0
|
12月前
|
存储 数据采集 消息中间件
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Sink:从Channel中取数据
在Flume中,Sink是数据采集和传输过程中的最终组件。它负责从Channel缓冲区中获取数据并将其存储到目标存储系统中。
200 0
|
12月前
|
存储 数据采集 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Channel:临时存储数据的管道
在Flume中,Channel是数据采集和传输过程中的一个重要组件。它负责存储从Source获取的数据,并将其转发给Sink进行处理和存储。
107 0
|
12月前
|
数据采集 消息中间件 存储
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Event:数据基本单元
在Flume中,Event是数据采集和传输过程中的基本单元。每个Event都代表了一个数据记录,包括一个头信息和一个消息体。
168 0
|
12月前
|
存储 数据采集 JSON
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Source:数据的收集端
在Flume中,Source是数据采集和传输过程中的一个重要组件。它负责从生产者获取数据并将其发送到Channel缓冲区中,为后续的数据处理和存储提供支持。
146 0
|
12月前
|
存储 数据采集 监控
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Agent
在Flume中,Agent是数据采集和传输过程中的核心组件。它负责从Source获取数据,并将其发送到Channel缓冲区中,最后将经过处理的数据发送给Sink进行存储。
278 0