Background
In GOP system, there are quite a few daemon services that run in the background to do certain tasks. For instance:
- Update user game account
- Update user login grant time
- Send user payment notification to game server
- Check payment status from payment channels
- Archive old data etc.
Since the services were written by different developers, the coding style varies from each other, which makes them very hard to maintain along the way.
Finally we can hardly stand for it and decide to refactor them with cleaner code.
Find Out the Pattern
Despite the daemon services are running for different purposes, they do have something in common. They all consume tasks from certain data source (MySQL, Kafka etc.).
Let’s Start the Refactoring
Abstract Base Class
To make it easier, I removed some code, only kept the key parts.
class BaseConsumer(object):
SLEEP_TIME = 1
DEFAULT_RETRY_TIMES = 0
DEFAULT_WORKER_COUNT = 1
RETRY_BASIC_INTERVAL = 0.1
RELOAD_INTERVAL = 60 * 5
def __init__(self, worker_id, **kwargs):
# here kwargs is the settings for different service
self.worker_id = worker_id
self.config = kwargs
self.worker_count = self.config.get('workers', self.DEFAULT_WORKER_COUNT)
self.task_retry_times = self.config.get('task_retry_times', self.DEFAULT_RETRY_TIMES)
self.reload_interval = self.config.get('reload_interval', self.RELOAD_INTERVAL)
self.consume_client = None
self.number_of_consumed = 0
def consume(self):
procname.setprocname(self.get_process_name())
self.pre_consume()
try:
latest_reload_timestamp = 0
for offset, record in self.iterate_task():
now = get_timestamp()
if now - latest_reload_timestamp > self.reload_interval:
self.reload_config()
latest_reload_timestamp = now
if self.can_stop_consume():
break
self.pre_handle_task()
tried = 0
while tried <= self.task_retry_times:
try:
self.handle_task(offset=offset, record=record)
break
except:
log.exception('handle_data_error|offset=%d,record=%s,worker_id=%s', offset, record, self.worker_id)
time.sleep(self.RETRY_BASIC_INTERVAL * tried)
tried += 1
self.post_handle_task()
except Exception:
log.exception('consume_exception')
self.post_consume()
def iterate_task(self):
raise NotImplementedError()
def handle_task(self, **kwargs):
raise NotImplementedError()
The BaseConsumer
is highly abstract, it accepts the params from outside and handle every single task from the data source. We have also implemented dynamic config reload and basic error retry mechanism there too.
Subclasses which extends from BaseConsumer
will take charge to implement 2 key methods:
iterate_task
to get tasks from the data source.
handle_task
to process the task.
Kafka and MySQL Subclasses
In order to reuse the code as much as possible, we will create KafkaBaseConsumer
and MySQLBaseConsumer
, and implemented iterate_task
defined in BaseConsumer
.
class KafkaBaseConsumer(BaseConsumer):
def iterate_task(self):
for offset, value in self.consume_client.consume():
record = from_json(value)
yield offset, record
class MySQLBaseConsumer(BaseConsumer):
def get_next_batch(self):
# to be override by subclass, query data from specific tables.
raise NotImplementedError()
def iterate_task(self):
while True:
task_list = self.get_next_batch()
if not task_list:
time.sleep(self.SLEEP_TIME)
continue
for _, task in enumerate(task_list):
yield _, task
The Worker Consumer
Now we can start to implement our consumers. Maybe you already noticed that we have not implement handle_task
yet. This should be done in the worker consumer.
In my last post How GOP Handle Traffic Spikes, the consumer used to update user login grant time is actually using KafkaBaseConsumer
class LoginConsumer(KafkaBaseConsumer):
def handle_task(self, offset, record):
refresh_db_connections()
platform = record['platform']
uid = record['uid']
app_id = record['app_id']
last_use_time = record['now']
grant_manager.update_grant_use_time(platform, uid, app_id, last_use_time)
log.data('update_grant_use_time|offset=%d,record=%s,worker_id=%s', offset, record, self.worker_id)
Here is another example using MySQLBaseConsumer
class AppEventDBConsumer(MySQLBaseConsumer):
def __init__(self, worker_id, **kwargs):
self.timeout = 10
self.back_off_time = 10
self.max_back_off_time = 2 * 60 * 60 # 2 hours
self.expire_time = 4 * 24 * 60 * 60
self.app_id = kwargs.get('app_id')
self.app = None
super(AppEventDBConsumer, self).__init__(worker_id, **kwargs)
def handle_task(self, offset, record):
# handle task here ...
def get_next_batch(self):
# get data from MySQL
return app_event_manager.get_fresh_event_by_app_id(self.app_id, settings.FRESH_RETRY_THRESHOLD)
How to Manage the Worker
Now we have done all the preparation, it is time to run our consumers. Instead of running it simply, we want to make it more flexible. Here are something we need to care about:
- We want to control the number of the workers, and the master worker should be able to adjust the worker count dynamically
- Considered that we have multiple consumers shared the same lib, we want to better control the consumer by config files
- Master worker should be aware of the status of all the sub workers and restart it if sub worker is down.
Then we have our JSON format config as below:
KAFKA_SERVER_SETTING = {
'gop_consumer': [{
'module': 'gop_consumer.consumers.login_consumer', # the python file where consumer stays
'class': 'LoginConsumer', # the exact consumer class
'configs': {
'workers': 16, # number of workers
'brokers': 'broker-1:9092,broker-2:9092,broker-3:9092',
'process_name': 'gop_consumer', # process name
'topic_name': 'login_task', # kafka topic
'consumer_group': 'login_consumer', # kafka consumer group
}
}]
}
The config is loaded in master consumer and it will spawn specific number of workers to consume data.
def run(self):
while True:
self.reload_config()
for master_id, consumer_class_list in self.config.items():
if master_id in self.running_processes:
continue
for consumer_settings in consumer_class_list:
try:
module_name = consumer_settings['module']
consumer_name = consumer_settings['class']
configs = consumer_settings['configs']
workers = configs.get('workers', 1)
module_lib = importlib.import_module(module_name)
consumer_class = getattr(module_lib, consumer_name)
for worker_id in xrange(0, workers):
# spawn workers
self.spawn_process(consumer_class, master_id, worker_id, configs)
except:
log.exception('consumer_processor_spwn_exception')
# check and restart process if it is dead
...
time.sleep(self.WORKER_SCAN_INTERVAL)
def spawn_process(self, consumer_class, master_id, worker_id, configs, is_new=True):
self.pre_spawn_process()
process = Process(target=consumer_class(worker_id, **configs).consume, args=())
process.start()
log.data('master_id=%s,consumer_class=%s,worker_id=%s', master_id, consumer_class, worker_id)
What’s next?
Now we have already refactored most of our daemon services with the above lib, everything works well so far in our production environment. We will continue to add more mechanism in the following months.
- Add logic to handle SIGTERM signal and support gracefully shutdown or restart of workers
- Support sharding in worker consumer if there is multiple workers.