MGR插件初始化过程分析

  1. 云栖社区>
  2. 博客>
  3. 正文

MGR插件初始化过程分析

sun_ashe 2019-07-15 19:26:28 浏览159
展开阅读全文

一、INSTALL PLUGIN group_replication SONAME 'group_replication.so'

其实在MySQL中,不同的插件初始化过程都很类似,server 层的调用栈如下

(lldb) bt
* thread #20, stop reason = step over
  * frame #0: 0x000000011d4a6282 group_replication.so`plugin_group_replication_init(plugin_info=0x00007fb910815d38) at plugin.cc:1027
    frame #1: 0x00000001015e2ee4 mysqld`plugin_initialize(plugin=0x00007fb910815d38) at sql_plugin.cc:1244
    frame #2: 0x00000001015e7803 mysqld`mysql_install_plugin(thd=0x00007fb913859a00, name=0x00007fb91009fe18, dl=0x00007fb91009fe28) at sql_plugin.cc:2224
    frame #3: 0x00000001015e73e3 mysqld`Sql_cmd_install_plugin::execute(this=0x00007fb91009fe10, thd=0x00007fb913859a00) at sql_plugin.cc:4435
    frame #4: 0x00000001015abcc4 mysqld`mysql_execute_command(thd=0x00007fb913859a00, first_level=true) at sql_parse.cc:4835
    frame #5: 0x00000001015a10f5 mysqld`mysql_parse(thd=0x00007fb913859a00, parser_state=0x0000700001a95250) at sql_parse.cc:5570
    frame #6: 0x000000010159ded8 mysqld`dispatch_command(thd=0x00007fb913859a00, com_data=0x0000700001a95db8, command=COM_QUERY) at sql_parse.cc:1484
    frame #7: 0x00000001015a0216 mysqld`do_command(thd=0x00007fb913859a00) at sql_parse.cc:1025
    frame #8: 0x0000000101738c40 mysqld`::handle_connection(arg=0x00007fb91109f890) at connection_handler_per_thread.cc:300
    frame #9: 0x0000000101f35ffc mysqld`::pfs_spawn_thread(arg=0x00007fb911004ae0) at pfs.cc:2190
    frame #10: 0x00007fff784c2661 libsystem_pthread.dylib`_pthread_body + 340
    frame #11: 0x00007fff784c250d libsystem_pthread.dylib`_pthread_start + 377
    frame #12: 0x00007fff784c1bf9 libsystem_pthread.dylib`thread_start + 13

所有的插件初始化都会定义如下的初始化函数,对应到group_replication插件中如下

int plugin_group_replication_init(MYSQL_PLUGIN plugin_info)

初始化的过程如下:

二、 如果开启了performance_schema,注册相关监控项

  // Register all PSI keys at the time plugin init
#ifdef HAVE_PSI_INTERFACE
  register_all_group_replication_psi_keys();
#endif /* HAVE_PSI_INTERFACE */

register_all_group_replication_psi_keys();

void register_all_group_replication_psi_keys()
{
  register_group_replication_mutex_psi_keys(all_group_replication_psi_mutex_keys,
                                            array_elements(all_group_replication_psi_mutex_keys));
  register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
                                           array_elements(all_group_replication_psi_condition_keys));
  register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
                                             array_elements(all_group_replication_psi_thread_keys));
  register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
                                             array_elements(all_group_replication_psi_rwlock_keys));
}

2.1 注册mutex 监控项,在performance_schema开启的情况下,就可以在对应的表中查看这些mutex的占用情况。

static PSI_mutex_info all_group_replication_psi_mutex_keys[]=
{
  {&key_GR_LOCK_applier_module_run, "LOCK_applier_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_applier_module_suspend, "LOCK_applier_module_suspend", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_broadcast_run, "LOCK_certifier_broadcast_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_broadcast_dispatcher_run, "LOCK_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_certification_info, "LOCK_certification_info", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_members, "LOCK_certification_members", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_channel_observation_list, "LOCK_channel_observation_list", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_delayed_init_run, "LOCK_delayed_init_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_delayed_init_server_ready, "LOCK_delayed_init_server_ready", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_run, "key_GR_LOCK_group_part_handler_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_abort, "key_GR_LOCK_group_part_handler_abort", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_view_modification_wait, "LOCK_view_modification_wait", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_info_manager, "LOCK_group_info_manager", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_continuation, "LOCK_pipeline_continuation", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_synchronized_queue, "LOCK_synchronized_queue", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_count_down_latch, "LOCK_count_down_latch", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_wait_ticket, "LOCK_wait_ticket", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery_module_run, "LOCK_recovery_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery, "LOCK_recovery", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery_donor_selection, "LOCK_recovery_donor_selection", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_session_thread_method_exec, "LOCK_session_thread_method_exec", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_session_thread_run, "LOCK_session_thread_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_plugin_running, "LOCK_plugin_running", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_force_members_running, "LOCK_force_members_running", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_write_lock_protection, "LOCK_write_lock_protection", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_stats_flow_control, "LOCK_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_stats_transactions_waiting_apply, "LOCK_pipeline_stats_transactions_waiting_apply", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_trx_unlocking, "LOCK_transaction_unblocking", PSI_FLAG_GLOBAL}
};

2.2 注册条件变量监控

  register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
                                           array_elements(all_group_replication_psi_condition_keys));

group_replication涉及到的条件变量有如下

static PSI_cond_info all_group_replication_psi_condition_keys[]=
{
  {&key_GR_COND_applier_module_run, "COND_applier_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_applier_module_suspend,  "COND_applier_module_suspend", PSI_FLAG_GLOBAL},
  {&key_GR_COND_applier_module_wait, "COND_applier_module_wait", PSI_FLAG_GLOBAL},
  {&key_GR_COND_cert_broadcast_run, "COND_certifier_broadcast_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_cert_broadcast_dispatcher_run, "COND_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_delayed_init_run,  "COND_delayed_init_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_delayed_init_server_ready, "COND_delayed_init_server_ready", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_run, "COND_group_part_handler_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_abort, "COND_group_part_handler_abort", PSI_FLAG_GLOBAL},
  {&key_GR_COND_view_modification_wait, "COND_view_modification_wait", PSI_FLAG_GLOBAL},
  {&key_GR_COND_pipeline_continuation, "COND_pipeline_continuation", PSI_FLAG_GLOBAL},
  {&key_GR_COND_synchronized_queue, "COND_synchronized_queue", PSI_FLAG_GLOBAL},
  {&key_GR_COND_count_down_latch, "COND_count_down_latch", PSI_FLAG_GLOBAL},
  {&key_GR_COND_wait_ticket, "COND_wait_ticket", PSI_FLAG_GLOBAL},
  {&key_GR_COND_recovery_module_run, "COND_recovery_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_recovery, "COND_recovery", PSI_FLAG_GLOBAL},
  {&key_GR_COND_session_thread_method_exec, "COND_session_thread_method_exec", PSI_FLAG_GLOBAL},
  {&key_GR_COND_session_thread_run, "COND_session_thread_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_pipeline_stats_flow_control, "COND_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
};

2.3 注册group_replication相关的线程

register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
                                             array_elements(all_group_replication_psi_thread_keys));

group_replication中涉及到如下线程

static PSI_thread_info all_group_replication_psi_thread_keys[]=
{
  {&key_GR_THD_applier_module_receiver, "THD_applier_module_receiver", PSI_FLAG_GLOBAL},
  {&key_GR_THD_cert_broadcast, "THD_certifier_broadcast", PSI_FLAG_GLOBAL},
  {&key_GR_THD_delayed_init, "THD_delayed_initialization", PSI_FLAG_GLOBAL},
  {&key_GR_THD_plugin_session, "THD_plugin_server_session", PSI_FLAG_GLOBAL},
  {&key_GR_THD_group_partition_handler, "THD_group_partition_handler", PSI_FLAG_GLOBAL},
  {&key_GR_THD_recovery, "THD_recovery", PSI_FLAG_GLOBAL}
};

2.4 注册读写锁监控

register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
                                             array_elements(all_group_replication_psi_rwlock_keys));

涉及到的读写锁有如下

static PSI_rwlock_info all_group_replication_psi_rwlock_keys[]=
{
  {&key_GR_RWLOCK_cert_stable_gtid_set, "RWLOCK_certifier_stable_gtid_set", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_io_cache_unused_list , "RWLOCK_io_cache_unused_list", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_plugin_stop, "RWLOCK_plugin_stop", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_gcs_operations, "RWLOCK_gcs_operations", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_gcs_operations_finalize_ongoing, "RWLOCK_gcs_operations_finalize_ongoing", PSI_FLAG_GLOBAL}
};

三、初始化会使用到的互斥锁,读写锁等

  mysql_mutex_init(key_GR_LOCK_plugin_running, &plugin_running_mutex,
                   MY_MUTEX_INIT_FAST);
  mysql_mutex_init(key_GR_LOCK_force_members_running,
                   &force_members_running_mutex,
                   MY_MUTEX_INIT_FAST);

  plugin_stop_lock= new Checkable_rwlock(
#ifdef HAVE_PSI_INTERFACE
                                         key_GR_RWLOCK_plugin_stop
#endif /* HAVE_PSI_INTERFACE */
                                        );

  shared_plugin_stop_lock= new Shared_writelock(plugin_stop_lock);
  • plugin_running_mutex

负责保护变量group_replication_running

  • force_members_running_mutex

负责保护force_members_running

四、作为观察者向server注册回调函数

注册的目的是为了在某些过程中的回调。

4.1 注册server_state_observer

server state observer主要用于实现,在数据库启动停止时的回调动作。如下:

Server_state_observer server_state_observer = {
  sizeof(Server_state_observer),

  group_replication_before_handle_connection,  //before the client connects to the server
  group_replication_before_recovery,           //before recovery
  group_replication_after_engine_recovery,     //after engine recovery
  group_replication_after_recovery,            //after recovery
  group_replication_before_server_shutdown,    //before shutdown
  group_replication_after_server_shutdown,     //after shutdown
};

但其实group_replication并不关心这些操作,所以回调函数基本直接返回。


/*
  DBMS lifecycle events observers.
*/
int group_replication_before_handle_connection(Server_state_param *param)
{
  if (wait_on_engine_initialization)
  {
    delayed_initialization_thread->signal_thread_ready();
    delayed_initialization_thread->wait_for_read_mode();
  }
  return 0;
}

int group_replication_before_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_after_engine_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_after_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_before_server_shutdown(Server_state_param *param)
{
  return 0;
}

int group_replication_after_server_shutdown(Server_state_param *param)
{
  server_shutdown_status= true;
  plugin_group_replication_stop();

  return 0;
}

在用户链接时,需要做如下操作, 暂时没有细究,应该是在切换,或者初始化时等待状态的切换完成。

int group_replication_before_handle_connection(Server_state_param *param)
{
  if (wait_on_engine_initialization)
  {
    delayed_initialization_thread->signal_thread_ready();
    delayed_initialization_thread->wait_for_read_mode();
  }
  return 0;
}

在数据库shutdown时,需要关闭MGR插件,所以执行函数plugin_group_replication_stop().

int group_replication_after_server_shutdown(Server_state_param *param)
{
  server_shutdown_status= true;
  plugin_group_replication_stop();

  return 0;
}

4.2 Register a transaction observer

注册事务操作回调

  if (register_trans_observer(&trans_observer, (void *)plugin_info_ptr))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failure when registering the transactions state observers");
    return 1;
    /* purecov: end */
  }

事务操作的前后需要进行的回调操作,如下,主要的功能点也在这里实现。

Trans_observer trans_observer = {
  sizeof(Trans_observer),

  group_replication_trans_before_dml,
  group_replication_trans_before_commit,
  group_replication_trans_before_rollback,
  group_replication_trans_after_commit,
  group_replication_trans_after_rollback,
};
  • group_replication_trans_before_dml
    在进行dml操作前,MGR需要提前进行一些检查操作,如下

image

  • group_replication_trans_before_commit
    更多的一致性实现过程在group_replication_trans_before_commit中。

4.3 Register a binlog transmit observer

这部分注册,用于在binlog的发送过程中进行回调

  if (register_binlog_transmit_observer(&binlog_transmit_observer,
                                        (void *)plugin_info_ptr))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failure when registering the binlog state observers");
    return 1;
    /* purecov: end */
  }

这部分分为如下几种情况,在semisync插件中这部分应用的很多,但是在group_replication插件中,这部分基本没有使用,也就是回调函数不做操作。只有group_replication_reset_master_logs,设置了一个内部的变量known_server_reset;

Binlog_transmit_observer binlog_transmit_observer = {
  sizeof(Binlog_transmit_observer),

  group_replication_transmit_start,     // transmit_start,
  group_replication_transmit_stop,      // transmit_stop,
  group_replication_reserve_header,     // reserve_header,
  group_replication_before_send_event,  // before_send_event,
  group_replication_after_send_event,   // after_send_event,
  group_replication_reset_master_logs   // reset_master
};

五、其它的初始化操作

  //Initialize the recovery SSL option map
  initialize_ssl_option_map();

  //Initialize channel observation and auto increment handlers before start
  auto_increment_handler= new Plugin_group_replication_auto_increment();
  channel_observation_manager= new Channel_observation_manager(plugin_info);
  view_change_notifier= new Plugin_gcs_view_modification_notifier();
  gcs_module= new Gcs_operations();

  //Initialize the compatibility module before starting
  init_compatibility_manager();

另外如果开启了初始化时启动mgr,则会调用启动函数

 plugin_is_auto_starting= start_group_replication_at_boot_var;
  if (start_group_replication_at_boot_var && plugin_group_replication_start())
  {
    log_message(MY_ERROR_LEVEL,
                "Unable to start Group Replication on boot");
  }

网友评论

登录后评论
0/500
评论
sun_ashe
+ 关注