Multiprocessing and Multithreading in Python OJAI Applications

Python supports multiprocessing and multithreading modules that enable you to spawn either multiple processes or multiple threads in a Python program. This section contains examples that show you how to use these modules in your Python OJAI application.

Multiprocessing in Python OJAI Applications

The following code example spawns multiple processes using the Python multiprocessing module. When you use the module, you must create a separate OJAI connection for each process.

The code example is available at 014_multiprocessing_example.py.

"""Following example works with Python Client"""

import multiprocessing
from mapr.ojai.storage.ConnectionFactory import ConnectionFactory

"""Create a connection, get store, insert_or_replace/update document in
 store via multiprocessing"""

# Create a connection string using path:user@password
connection_str = "localhost:5678?auth=basic;user=mapr;password=mapr;" \
          "ssl=true;" \
          "sslCA=/opt/mapr/conf/ssl_truststore.pem;" \
          "sslTargetNameOverride=node1.mapr.com"


# Create method which will be used for multiprocessing
def sample():
    # Create connection from connection_url
    # Cannot share connection for processes,
    # so need to create connection for each process.
    connection = ConnectionFactory().get_connection(connection_str=connection_str)

    # Get a store and assign it as a DocumentStore object
    store = connection.get_or_create_store('/tmp/store_name')

    # Insert 15 documents, represented as Python dictionaries,
    # into DocumentStore
    for i in range(15):
        store.insert_or_replace(doc={'_id': str(i), 'name': 'Greg'})

    # Create DocumentMutation object using the OJAIConnection object
    mutation = connection.new_mutation()

    # Set mutation value
    mutation.set_or_replace(field_path='name', value='T')

    # Update 15 Document in store
    for i in range(15):
        store.update(_id=str(i), mutation=mutation)


# Create simple method for run process from Pool
def run(unused_var):
    pass


# Create data for multiprocessing
proces_count = 7
map_iterable = [1]  # simple iterator

# Create Pool object using the function and process_count value
p = multiprocessing.Pool(proces_count, initializer=sample)

# Run processes from the Pool
p.map(run, map_iterable)

Multithreading in Python OJAI Applications

You can use either the Python thread or threading module to spawn multiple threads in your Python application. When you use these modules, you can share an OJAI connection across threads.

The following code example uses the thread module. It is available at 015_thread_example.py.

"""Following example works with Python Client"""
import thread
import time
from mapr.ojai.storage.ConnectionFactory import ConnectionFactory

"""Create a connection, get store, insert_or_replace/update document in
 store via thread using same connection"""


# Create method which will be used for threads
def run_thread(name, conn):
    # Print that thread started with threadName
    print('\n Start thread ', name)

    # Get a store and assign it as a DocumentStore object
    store = conn.get_or_create_store('/tmp/store_name')

    # Insert 15 documents, represented as Python dictionaries,
    # into DocumentStore
    for index in range(15):
        store.insert_or_replace(doc={'_id': str(index), 'name': 'Greg'})

    # Create DocumentMutation object using the OJAIConnection object
    mutation = conn.new_mutation()

    # Set mutation value
    mutation.set_or_replace(field_path='name', value='T')

    # Update 15 Document in store
    for index in range(15):
        store.update(_id=str(index), mutation=mutation)

    # Print that thread done with threadName
    print('\n Done thread ', name)


# Create a connection string using path:user@password
connection_str = "localhost:5678?auth=basic;user=mapr;password=mapr;" \
          "ssl=true;" \
          "sslCA=/opt/mapr/conf/ssl_truststore.pem;" \
          "sslTargetNameOverride=node1.mapr.com"

# Create connection from connection_url
# Can share connection for processes,
# so need to only one connection instance for all threads
connection = ConnectionFactory.get_connection(connection_str)

# Create 10 threads using the same connection instance
for i in range(10):
    thread_name = 'Thread-{}'.format(str(i))
    thread.start_new_thread(run_thread, (thread_name, connection,))

# This thread implementation doesn't return thread object
# so thread status cannot be checked
# Wait 10 seconds
time.sleep(10)

# Close connection
connection.close()

The following code example uses the threading module. It is available at 016_threading_example.py.

"""Following example works with Python Client"""
import threading
import time

from mapr.ojai.storage.ConnectionFactory import ConnectionFactory

"""Create a connection, get store, insert_or_replace/update document in
 store via thread using same connection"""

# Create a connection string using path:user@password
connection_str = "localhost:5678?auth=basic;user=mapr;password=mapr;" \
          "ssl=true;" \
          "sslCA=/opt/mapr/conf/ssl_truststore.pem;" \
          "sslTargetNameOverride=node1.mapr.com"

# Create connection from connection_url
# Can share connection for processes,
# so need to only one connection instance for all threads
connection = ConnectionFactory.get_connection(connection_str)


# Create child for sample threading implementation
class MyThread(threading.Thread):
    # Implement __init__() method, which takes thread name and
    # connection object
    def __init__(self, name, connection):
        threading.Thread.__init__(self)
        self.name = name
        self.connection = connection

    # Implement run() method
    def run(self):
        # Print that thread started with threadName
        print('\n Start thread ', self.name)

        # Get a store and assign it as a DocumentStore object
        store = connection.get_or_create_store('/tmp/store_name')

        # Insert 15 documents, represented as Python dictionaries,
        # into DocumentStore
        for index in range(15):
            store.insert_or_replace(doc={'_id': str(index), 'name': 'Greg'})

        # Create DocumentMutation object using the OJAIConnection object
        mutation = connection.new_mutation()

        # Set mutation value
        mutation.set_or_replace(field_path='name', value='T')

        # Update 15 Document in store
        for index in range(15):
            store.update(_id=str(index), mutation=mutation)

        # Print that thread done with threadName
        print('\n Done thread ', self.name)


# This thread implementation return thread object
# so thread status can be checked via native methods
# Simple thread waiter for thread list:
def waiter(threads):
    for my_thread in threads:
        # Check that current thread is alive
        if my_thread.is_alive():
            time.sleep(1)
            # Wait until current thread finished
            waiter(threads)
        # Move to the next thread if this is not alive
        elif not my_thread.is_alive():
            pass


# Create list instance for storing created threads objects
thread_list = []

# Create and run 10 threads
for i in range(10):
    # Create thread instance using MyThread and OJAIConnection object
    thread = MyThread(name='Thread-{0}'.format(str(i)),
                      connection=connection)

    # Start current thread
    thread.start()

    # Append thread object into thread_list
    thread_list.append(thread)

# Wait until all threads will finished
waiter(thread_list)

# Close connection
connection.close()