Multiprocessing and Multithreading in Python OJAI Applications
Python supports multiprocessing and multithreading modules that enable you to spawn either multiple processes or multiple threads in a Python program. This section contains examples that show you how to use these modules in your Python OJAI application.
Multiprocessing in Python OJAI Applications
The following code example spawns multiple processes using the Python
multiprocessing
module. When you use the module, you must create a
separate OJAI connection for each process.
The code example is available at 014_multiprocessing_example.py.
"""Following example works with Python Client""" import multiprocessing from mapr.ojai.storage.ConnectionFactory import ConnectionFactory """Create a connection, get store, insert_or_replace/update document in store via multiprocessing""" # Create a connection string using path:user@password connection_str = "localhost:5678?auth=basic;user=mapr;password=mapr;" \ "ssl=true;" \ "sslCA=/opt/mapr/conf/ssl_truststore.pem;" \ "sslTargetNameOverride=node1.mapr.com" # Create method which will be used for multiprocessing def sample(): # Create connection from connection_url # Cannot share connection for processes, # so need to create connection for each process. connection = ConnectionFactory().get_connection(connection_str=connection_str) # Get a store and assign it as a DocumentStore object store = connection.get_or_create_store('/tmp/store_name') # Insert 15 documents, represented as Python dictionaries, # into DocumentStore for i in range(15): store.insert_or_replace(doc={'_id': str(i), 'name': 'Greg'}) # Create DocumentMutation object using the OJAIConnection object mutation = connection.new_mutation() # Set mutation value mutation.set_or_replace(field_path='name', value='T') # Update 15 Document in store for i in range(15): store.update(_id=str(i), mutation=mutation) # Create simple method for run process from Pool def run(unused_var): pass # Create data for multiprocessing proces_count = 7 map_iterable = [1] # simple iterator # Create Pool object using the function and process_count value p = multiprocessing.Pool(proces_count, initializer=sample) # Run processes from the Pool p.map(run, map_iterable)
Multithreading in Python OJAI Applications
You can use either the Python thread
or threading
module
to spawn multiple threads in your Python application. When you use these modules, you can
share an OJAI connection across threads.
The following code example uses the thread
module. It is available
at 015_thread_example.py.
"""Following example works with Python Client""" import thread import time from mapr.ojai.storage.ConnectionFactory import ConnectionFactory """Create a connection, get store, insert_or_replace/update document in store via thread using same connection""" # Create method which will be used for threads def run_thread(name, conn): # Print that thread started with threadName print('\n Start thread ', name) # Get a store and assign it as a DocumentStore object store = conn.get_or_create_store('/tmp/store_name') # Insert 15 documents, represented as Python dictionaries, # into DocumentStore for index in range(15): store.insert_or_replace(doc={'_id': str(index), 'name': 'Greg'}) # Create DocumentMutation object using the OJAIConnection object mutation = conn.new_mutation() # Set mutation value mutation.set_or_replace(field_path='name', value='T') # Update 15 Document in store for index in range(15): store.update(_id=str(index), mutation=mutation) # Print that thread done with threadName print('\n Done thread ', name) # Create a connection string using path:user@password connection_str = "localhost:5678?auth=basic;user=mapr;password=mapr;" \ "ssl=true;" \ "sslCA=/opt/mapr/conf/ssl_truststore.pem;" \ "sslTargetNameOverride=node1.mapr.com" # Create connection from connection_url # Can share connection for processes, # so need to only one connection instance for all threads connection = ConnectionFactory.get_connection(connection_str) # Create 10 threads using the same connection instance for i in range(10): thread_name = 'Thread-{}'.format(str(i)) thread.start_new_thread(run_thread, (thread_name, connection,)) # This thread implementation doesn't return thread object # so thread status cannot be checked # Wait 10 seconds time.sleep(10) # Close connection connection.close()
The following code example uses the threading
module. It is
available at 016_threading_example.py.
"""Following example works with Python Client""" import threading import time from mapr.ojai.storage.ConnectionFactory import ConnectionFactory """Create a connection, get store, insert_or_replace/update document in store via thread using same connection""" # Create a connection string using path:user@password connection_str = "localhost:5678?auth=basic;user=mapr;password=mapr;" \ "ssl=true;" \ "sslCA=/opt/mapr/conf/ssl_truststore.pem;" \ "sslTargetNameOverride=node1.mapr.com" # Create connection from connection_url # Can share connection for processes, # so need to only one connection instance for all threads connection = ConnectionFactory.get_connection(connection_str) # Create child for sample threading implementation class MyThread(threading.Thread): # Implement __init__() method, which takes thread name and # connection object def __init__(self, name, connection): threading.Thread.__init__(self) self.name = name self.connection = connection # Implement run() method def run(self): # Print that thread started with threadName print('\n Start thread ', self.name) # Get a store and assign it as a DocumentStore object store = connection.get_or_create_store('/tmp/store_name') # Insert 15 documents, represented as Python dictionaries, # into DocumentStore for index in range(15): store.insert_or_replace(doc={'_id': str(index), 'name': 'Greg'}) # Create DocumentMutation object using the OJAIConnection object mutation = connection.new_mutation() # Set mutation value mutation.set_or_replace(field_path='name', value='T') # Update 15 Document in store for index in range(15): store.update(_id=str(index), mutation=mutation) # Print that thread done with threadName print('\n Done thread ', self.name) # This thread implementation return thread object # so thread status can be checked via native methods # Simple thread waiter for thread list: def waiter(threads): for my_thread in threads: # Check that current thread is alive if my_thread.is_alive(): time.sleep(1) # Wait until current thread finished waiter(threads) # Move to the next thread if this is not alive elif not my_thread.is_alive(): pass # Create list instance for storing created threads objects thread_list = [] # Create and run 10 threads for i in range(10): # Create thread instance using MyThread and OJAIConnection object thread = MyThread(name='Thread-{0}'.format(str(i)), connection=connection) # Start current thread thread.start() # Append thread object into thread_list thread_list.append(thread) # Wait until all threads will finished waiter(thread_list) # Close connection connection.close()