Thread Safety issues

Nov 21, 2011 at 9:27 AM
Edited Nov 21, 2011 at 9:34 AM

Imagine the test export/import:

        [Export("contract")]
        public class Exported
        {
        }

        public class Imported
        {
            [ImportMany("contract", AllowRecomposition = true)]
            public object[] Items { get; set; }
        }

Use case 1:

        private static void Test1()
        {
            AggregateCatalog catalog = new AggregateCatalog();

            CompositionContainer container = new CompositionContainer(catalog, CompositionOptions.IsThreadSafe);

            Imported recomposable = new Imported();
            container.ComposeParts(recomposable);

            for (int i = 0; i < 100; i++)
            {
                ThreadPool.QueueUserWorkItem(c =>
                {
                    AggregateCatalog aggregate = (AggregateCatalog)c;

                    aggregate.Catalogs.Add(new TypeCatalog(typeof(Exported)));
                },
                catalog);
            }

            Thread.Sleep(5000);

            Debug.WriteLine(recomposable.Items.Length == 100);
        }

The result is not predictible and from the tests execution can end with:

  • proper importing, no exception
  • the imported collection contains less exported values as expected (<100 in the sample above), no exception
  • null reference exception
  • collection modified exception

When adding locking like:

        private static void Test1()
        {
            AggregateCatalog catalog = new AggregateCatalog();

            CompositionContainer container = new CompositionContainer(catalog, CompositionOptions.IsThreadSafe);

            Imported recomposable = new Imported();
            container.ComposeParts(recomposable);

            for (int i = 0; i < 1000; i++)
            {
                ThreadPool.QueueUserWorkItem(c =>
                {
                    AggregateCatalog aggregate = (AggregateCatalog)c;

                    lock (aggregate)
                    {
                        aggregate.Catalogs.Add(new TypeCatalog(typeof(Exported)));
                    }
                },
                catalog);
            }

            Thread.Sleep(5000);

            Debug.WriteLine(recomposable.Items.Length == 1000);
        }

The threading issues are not occurring any more.

Use case 2:

        private static void Test2()
        {
            AggregateCatalog catalog = new AggregateCatalog();

            AggregateCatalog c1 = new AggregateCatalog();
            AggregateCatalog c2 = new AggregateCatalog();

            catalog.Catalogs.Add(c1);
            catalog.Catalogs.Add(c2);

            CompositionContainer container = new CompositionContainer(catalog, CompositionOptions.IsThreadSafe);

            Imported recomposable = new Imported();
            container.ComposeParts(recomposable);

            for (int i = 0; i < 1000; i++)
            {
                ThreadPool.QueueUserWorkItem(CallbackAddLocked, c1);
                ThreadPool.QueueUserWorkItem(CallbackAddLocked, c2);
            }

            Thread.Sleep(5000);

            Debug.WriteLine(recomposable.Items.Length == 2000);
        }


        private static void CallbackAddLocked(object state)
        {
            AggregateCatalog catalog = (AggregateCatalog)state;

            lock (catalog)
            {
                catalog.Catalogs.Add(new TypeCatalog(typeof(Exported)));
            }
        }

Even with locking on the c1 and c2 the threading issues seems to still occur (caused by not synchronized parent catalog).

Use case 3:

        private static void Test3()
        {
            AggregateCatalog c1 = new AggregateCatalog();
            AggregateCatalog c2 = new AggregateCatalog();

            CatalogExportProvider provider1 = new CatalogExportProvider(c1, CompositionOptions.IsThreadSafe);
            CatalogExportProvider provider2 = new CatalogExportProvider(c2, CompositionOptions.IsThreadSafe);

            CompositionContainer container = new CompositionContainer(CompositionOptions.IsThreadSafe, provider1, provider2);

            provider1.SourceProvider = container;
            provider2.SourceProvider = container;

            Imported recomposable = new Imported();
            container.ComposeParts(recomposable);

            for (int i = 0; i < 1000; i++)
            {
                ThreadPool.QueueUserWorkItem(CallbackAddLocked, c1);
                ThreadPool.QueueUserWorkItem(CallbackAddLocked, c2);
            }

            Thread.Sleep(5000);

            Debug.WriteLine(recomposable.Items.Length == 2000);
        }

Results the same as in the previous use cases.


 

Q: Is it expected or unexpected behavior?

 

Thanks for help,

Bartek


Stack traces:

   at System.ThrowHelper.ThrowInvalidOperationException(ExceptionResource resource)
   at System.Collections.Generic.List`1.Enumerator.MoveNextRare()
   at System.Collections.Generic.List`1.Enumerator.MoveNext()
   at Microsoft.Internal.Collections.WeakReferenceCollection`1.AliveItemsToList()
   at System.ComponentModel.Composition.Hosting.ImportEngine.RecompositionManager.UpdateImportIndex()
   at System.ComponentModel.Composition.Hosting.ImportEngine.RecompositionManager.GetAffectedParts(IEnumerable`1 changedContractNames)
   at System.ComponentModel.Composition.Hosting.ImportEngine.OnExportsChanging(Object sender, ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CompositionServices.TryFire[TEventArgs](EventHandler`1 _delegate, Object sender, TEventArgs e)
   at System.ComponentModel.Composition.Hosting.ExportProvider.OnExportsChanging(ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CompositionContainer.OnExportsChangingInternal(Object sender, ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CompositionServices.TryFire[TEventArgs](EventHandler`1 _delegate, Object sender, TEventArgs e)
   at System.ComponentModel.Composition.Hosting.ExportProvider.OnExportsChanging(ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.AggregateExportProvider.OnExportChangingInternal(Object sender, ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CompositionServices.TryFire[TEventArgs](EventHandler`1 _delegate, Object sender, TEventArgs e)
   at System.ComponentModel.Composition.Hosting.ExportProvider.OnExportsChanging(ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CatalogExportProvider.OnCatalogChanging(Object sender, ComposablePartCatalogChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.ComposablePartCatalogCollection.OnChanging(Object sender, ComposablePartCatalogChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.AggregateCatalog.OnChanging(ComposablePartCatalogChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.ComposablePartCatalogCollection.RaiseChangingEvent(Lazy`1 addedDefinitions, Lazy`1 removedDefinitions, AtomicComposition atomicComposition)
   at System.ComponentModel.Composition.Hosting.ComposablePartCatalogCollection.Add(ComposablePartCatalog item)

 

   at Microsoft.Internal.Collections.WeakReferenceCollection`1.AliveItemsToList()
   at System.ComponentModel.Composition.Hosting.ImportEngine.RecompositionManager.UpdateImportIndex()
   at System.ComponentModel.Composition.Hosting.ImportEngine.RecompositionManager.GetAffectedParts(IEnumerable`1 changedContractNames)
   at System.ComponentModel.Composition.Hosting.ImportEngine.OnExportsChanging(Object sender, ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CompositionServices.TryFire[TEventArgs](EventHandler`1 _delegate, Object sender, TEventArgs e)
   at System.ComponentModel.Composition.Hosting.ExportProvider.OnExportsChanging(ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CompositionContainer.OnExportsChangingInternal(Object sender, ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CompositionServices.TryFire[TEventArgs](EventHandler`1 _delegate, Object sender, TEventArgs e)
   at System.ComponentModel.Composition.Hosting.ExportProvider.OnExportsChanging(ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.AggregateExportProvider.OnExportChangingInternal(Object sender, ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CompositionServices.TryFire[TEventArgs](EventHandler`1 _delegate, Object sender, TEventArgs e)
   at System.ComponentModel.Composition.Hosting.ExportProvider.OnExportsChanging(ExportsChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.CatalogExportProvider.OnCatalogChanging(Object sender, ComposablePartCatalogChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.ComposablePartCatalogCollection.OnChanging(Object sender, ComposablePartCatalogChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.AggregateCatalog.OnChanging(ComposablePartCatalogChangeEventArgs e)
   at System.ComponentModel.Composition.Hosting.ComposablePartCatalogCollection.RaiseChangingEvent(Lazy`1 addedDefinitions, Lazy`1 removedDefinitions, AtomicComposition atomicComposition)
   at System.ComponentModel.Composition.Hosting.ComposablePartCatalogCollection.Add(ComposablePartCatalog item)

Nov 21, 2011 at 7:46 PM

Thank you for pointing this out. We have checked our MSDN reference documentation on this scenario and found it contradictory; I have created an entry in the issue tracker for this http://mef.codeplex.com/workitem/14449 and we will follow up there after investigating.

Nick

From: gorskib [email removed]
Sent: Monday, November 21, 2011 1:27 AM
To: Nicholas Blumhardt
Subject: Thread Safety issues [MEF:280220]

From: gorskib

Imagine the test export/import:

        [Export("contract")]
        public class Exported
        {
        }
 
        public class Imported
        {
            [ImportMany("contract", AllowRecomposition = true)]
            public object[] Items { get; set; }
        }

Use case 1:

        private static void Test1()
        {
            AggregateCatalog catalog = new AggregateCatalog();
 
            CompositionContainer container = new CompositionContainer(catalog, CompositionOptions.IsThreadSafe);
 
            Imported recomposable = new Imported();
            container.ComposeParts(recomposable);
 
            for (int i = 0; i < 100; i++)
            {
                ThreadPool.QueueUserWorkItem(c =>
                {
                    AggregateCatalog aggregate = (AggregateCatalog)c;
 
                    aggregate.Catalogs.Add(new TypeCatalog(typeof(Exported)));
                },
                catalog);
            }
 
            Thread.Sleep(5000);
 
            Debug.WriteLine(recomposable.Items.Length == 100);
        }

The result is not predictible and from the tests execution can end with:

  • proper importing, no exception
  • the imported collection contains less exported values as expected (<100 in the sample above), no exception
  • null reference exception
  • collection modified exception

When adding locking like:

        private static void Test1()
        {
            AggregateCatalog catalog = new AggregateCatalog();
 
            CompositionContainer container = new CompositionContainer(catalog, CompositionOptions.IsThreadSafe);
 
            Imported recomposable = new Imported();
            container.ComposeParts(recomposable);
 
            for (int i = 0; i < 1000; i++)
            {
                ThreadPool.QueueUserWorkItem(c =>
                {
                    AggregateCatalog aggregate = (AggregateCatalog)c;
 
                    lock (aggregate)
                    {
                        aggregate.Catalogs.Add(new TypeCatalog(typeof(Exported)));
                    }
                },
                catalog);
            }
 
            Thread.Sleep(5000);
 
            Debug.WriteLine(recomposable.Items.Length == 100);
        }

The threading issues are not occurring any more.

Use case 2:

        private static void Test2()
        {
            AggregateCatalog catalog = new AggregateCatalog();
 
            AggregateCatalog c1 = new AggregateCatalog();
            AggregateCatalog c2 = new AggregateCatalog();
 
            catalog.Catalogs.Add(c1);
            catalog.Catalogs.Add(c2);
 
            CompositionContainer container = new CompositionContainer(catalog, CompositionOptions.IsThreadSafe);
 
            Imported recomposable = new Imported();
            container.ComposeParts(recomposable);
 
            for (int i = 0; i < 1000; i++)
            {
                ThreadPool.QueueUserWorkItem(CallbackAddLocked, c1);
                ThreadPool.QueueUserWorkItem(CallbackAddLocked, c2);
            }
 
            Thread.Sleep(5000);
 
            Debug.WriteLine(recomposable.Items.Length == 2000);
        }
 
 
        private static void CallbackAddLocked(object state)
        {
            AggregateCatalog catalog = (AggregateCatalog)state;
 
            lock (catalog)
            {
                catalog.Catalogs.Add(new TypeCatalog(typeof(Exported)));
            }
        }

Even with locking on the c1 and c2 the threading issues seems to still occur (caused by not synchronized parent catalog).

Use case 3:

        private static void Test3()
        {
            AggregateCatalog c1 = new AggregateCatalog();
            AggregateCatalog c2 = new AggregateCatalog();
 
            CatalogExportProvider provider1 = new CatalogExportProvider(c1, CompositionOptions.IsThreadSafe);
            CatalogExportProvider provider2 = new CatalogExportProvider(c2, CompositionOptions.IsThreadSafe);
 
            CompositionContainer container = new CompositionContainer(CompositionOptions.IsThreadSafe, provider1, provider2);
 
            provider1.SourceProvider = container;
            provider2.SourceProvider = container;
 
            Imported recomposable = new Imported();
            container.ComposeParts(recomposable);
 
            for (int i = 0; i < 1000; i++)
            {
                ThreadPool.QueueUserWorkItem(CallbackAddLocked, c1);
                ThreadPool.QueueUserWorkItem(CallbackAddLocked, c2);
            }
 
            Thread.Sleep(5000);
 
            Debug.WriteLine(recomposable.Items.Length == 2000);
        }

Results the same as in the previous use cases.


Q: Is it expected or unexpected behavior?

Thanks for help,

Bartek