Refactor GOP daemon services

This post introduced how GOP refactor all daemon services with a common lib to reuse code and reduce maintenance cost.

This post introduced how GOP refactor all daemon services with a common lib to reuse code and reduce maintenance cost.

Background

In GOP system, there are quite a few daemon services that run in the background to do certain tasks. For instance:

  1. Update user game account
  2. Update user login grant time
  3. Send user payment notification to game server
  4. Check payment status from payment channels
  5. Archive old data etc.
    Since the services were written by different developers, the coding style varies from each other, which makes them very hard to maintain along the way.
    Finally we can hardly stand for it and decide to refactor them with cleaner code.

Find Out the Pattern

Despite the daemon services are running for different purposes, they do have something in common. They all consume tasks from certain data source (MySQL, Kafka etc.).

Let’s Start the Refactoring

Abstract Base Class

To make it easier, I removed some code, only kept the key parts.

class BaseConsumer(object):  
  
  SLEEP_TIME = 1  
  DEFAULT_RETRY_TIMES = 0  
  DEFAULT_WORKER_COUNT = 1  
  RETRY_BASIC_INTERVAL = 0.1  
  RELOAD_INTERVAL = 60 * 5  
  
  def __init__(self, worker_id, **kwargs):  
	  # here kwargs is the settings for different service
	  self.worker_id = worker_id  
      self.config = kwargs  
      self.worker_count = self.config.get('workers', self.DEFAULT_WORKER_COUNT)  
	  self.task_retry_times = self.config.get('task_retry_times', self.DEFAULT_RETRY_TIMES)  
	  self.reload_interval = self.config.get('reload_interval', self.RELOAD_INTERVAL)  
	  self.consume_client = None  
	  self.number_of_consumed = 0  
  
  def consume(self):  
	  procname.setprocname(self.get_process_name())  
	  self.pre_consume()  
	  try:  
		  latest_reload_timestamp = 0  
		  for offset, record in self.iterate_task():  
			  now = get_timestamp()  
			  if now - latest_reload_timestamp > self.reload_interval:  
				  self.reload_config()  
				  latest_reload_timestamp = now  
	          if self.can_stop_consume():  
				  break  
			  self.pre_handle_task()  
			  tried = 0  
			  while tried <= self.task_retry_times:  
				  try:  
					  self.handle_task(offset=offset, record=record)  
					  break  
				  except:  
					  log.exception('handle_data_error|offset=%d,record=%s,worker_id=%s', offset, record, self.worker_id)  
					  time.sleep(self.RETRY_BASIC_INTERVAL * tried)  
					  tried += 1  
			  self.post_handle_task()  
	  except Exception:  
		  log.exception('consume_exception')  
	  self.post_consume()  
  
  def iterate_task(self): 
	  raise NotImplementedError()  
  
 def handle_task(self, **kwargs):  
	  raise NotImplementedError()  

The BaseConsumer is highly abstract, it accepts the params from outside and handle every single task from the data source. We have also implemented dynamic config reload and basic error retry mechanism there too.
Subclasses which extends from BaseConsumer will take charge to implement 2 key methods:
iterate_task to get tasks from the data source.
handle_task to process the task.

Kafka and MySQL Subclasses

In order to reuse the code as much as possible, we will create KafkaBaseConsumer and MySQLBaseConsumer, and implemented iterate_task defined in BaseConsumer.

class KafkaBaseConsumer(BaseConsumer):  
  
  def iterate_task(self):  
	  for offset, value in self.consume_client.consume():  
		  record = from_json(value)  
		  yield offset, record

class MySQLBaseConsumer(BaseConsumer):  
  
  def get_next_batch(self):
	  # to be override by subclass, query data from specific tables.
	  raise NotImplementedError()  
  
  def iterate_task(self):  
	  while True:  
		  task_list = self.get_next_batch()  
		  if not task_list:  
			  time.sleep(self.SLEEP_TIME)  
			  continue  
		  for _, task in enumerate(task_list):  
			  yield _, task

The Worker Consumer

Now we can start to implement our consumers. Maybe you already noticed that we have not implement handle_task yet. This should be done in the worker consumer.

In my last post How GOP Handle Traffic Spikes, the consumer used to update user login grant time is actually using KafkaBaseConsumer

class LoginConsumer(KafkaBaseConsumer):  
  
  def handle_task(self, offset, record):  
	  refresh_db_connections()  
	  platform = record['platform']  
	  uid = record['uid']  
	  app_id = record['app_id']  
	  last_use_time = record['now']  
	  grant_manager.update_grant_use_time(platform, uid, app_id, last_use_time)  
	  log.data('update_grant_use_time|offset=%d,record=%s,worker_id=%s', offset, record, self.worker_id)

Here is another example using MySQLBaseConsumer

class AppEventDBConsumer(MySQLBaseConsumer):  
  
  def __init__(self, worker_id, **kwargs):  
	  self.timeout = 10  
	  self.back_off_time = 10  
	  self.max_back_off_time = 2 * 60 * 60 # 2 hours  
	  self.expire_time = 4 * 24 * 60 * 60  
	  self.app_id = kwargs.get('app_id')  
	  self.app = None   
	  super(AppEventDBConsumer, self).__init__(worker_id, **kwargs)  
  
  def handle_task(self, offset, record):  
	  # handle task here ...

  def get_next_batch(self):  
	  # get data from MySQL
	  return app_event_manager.get_fresh_event_by_app_id(self.app_id, settings.FRESH_RETRY_THRESHOLD)

How to Manage the Worker

Now we have done all the preparation, it is time to run our consumers. Instead of running it simply, we want to make it more flexible. Here are something we need to care about:

  1. We want to control the number of the workers, and the master worker should be able to adjust the worker count dynamically
  2. Considered that we have multiple consumers shared the same lib, we want to better control the consumer by config files
  3. Master worker should be aware of the status of all the sub workers and restart it if sub worker is down.

Then we have our JSON format config as below:

KAFKA_SERVER_SETTING = {  
  'gop_consumer': [{  
	  'module': 'gop_consumer.consumers.login_consumer',  # the python file where consumer stays
	  'class': 'LoginConsumer',  # the exact consumer class
	  'configs': {  
		  'workers': 16, # number of workers
		  'brokers': 'broker-1:9092,broker-2:9092,broker-3:9092',  
		  'process_name': 'gop_consumer',  # process name
		  'topic_name': 'login_task',  # kafka topic
		  'consumer_group': 'login_consumer',  # kafka consumer group
		  }  
	 }]  
}

The config is loaded in master consumer and it will spawn specific number of workers to consume data.

def run(self):  
  while True:  
	  self.reload_config() 
	  for master_id, consumer_class_list in self.config.items():  
		  if master_id in self.running_processes:  
			  continue  
  
	  for consumer_settings in consumer_class_list:  
		  try:  
			  module_name = consumer_settings['module']  
			  consumer_name = consumer_settings['class']  
			  configs = consumer_settings['configs']  
			  workers = configs.get('workers', 1)  
			  module_lib = importlib.import_module(module_name)  
			  consumer_class = getattr(module_lib, consumer_name)  
			  for worker_id in xrange(0, workers):  
				  # spawn workers
				  self.spawn_process(consumer_class, master_id, worker_id, configs)  
			  except:  
				  log.exception('consumer_processor_spwn_exception')  
				  
	  # check and restart process if it is dead
	  ...
	  time.sleep(self.WORKER_SCAN_INTERVAL)

def spawn_process(self, consumer_class, master_id, worker_id, configs, is_new=True):  
	  self.pre_spawn_process()  
	  process = Process(target=consumer_class(worker_id, **configs).consume, args=())  
	  process.start()  
	  log.data('master_id=%s,consumer_class=%s,worker_id=%s', master_id, consumer_class, worker_id)

What’s next?

Now we have already refactored most of our daemon services with the above lib, everything works well so far in our production environment. We will continue to add more mechanism in the following months.

  1. Add logic to handle SIGTERM signal and support gracefully shutdown or restart of workers
  2. Support sharding in worker consumer if there is multiple workers.